Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 621D8200C63 for ; Thu, 11 May 2017 22:44:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60AA3160BB3; Thu, 11 May 2017 20:44:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F2435160BC7 for ; Thu, 11 May 2017 22:44:00 +0200 (CEST) Received: (qmail 528 invoked by uid 500); 11 May 2017 20:44:00 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 472 invoked by uid 99); 11 May 2017 20:44:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 May 2017 20:44:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03C7FDFFB4; Thu, 11 May 2017 20:44:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: tillw@apache.org To: commits@asterixdb.apache.org Date: Thu, 11 May 2017 20:44:00 -0000 Message-Id: In-Reply-To: <6a84033a723e489090a2f5ebd339a0d8@git.apache.org> References: <6a84033a723e489090a2f5ebd339a0d8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] asterixdb git commit: Enable HTTP API processing on NCs archived-at: Thu, 11 May 2017 20:44:03 -0000 Enable HTTP API processing on NCs - Query/Status/Result are answered by NC nodes - other HTTP requests are proxied to the CC node - SessionConfig refactoring – split into config and output (SessionOutput) - TestExecutor now can send http requests do multiple nodes (round robin) Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1709 Tested-by: Jenkins Reviewed-by: Michael Blow Integration-Tests: Jenkins Reviewed-by: abdullah alamoudi Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4671f712 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4671f712 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4671f712 Branch: refs/heads/master Commit: 4671f7127c484b805b22ad3ff95c08c47a031d12 Parents: 7a0ba78 Author: Till Westmann Authored: Thu May 11 09:13:57 2017 -0700 Committer: Till Westmann Committed: Thu May 11 13:43:30 2017 -0700 ---------------------------------------------------------------------- .../asterix/translator/IStatementExecutor.java | 26 ++- .../translator/IStatementExecutorFactory.java | 6 +- .../asterix/translator/SessionConfig.java | 78 ++------- .../asterix/translator/SessionOutput.java | 88 +++++++++++ .../apache/asterix/api/common/APIFramework.java | 71 +++++---- .../http/server/AbstractQueryApiServlet.java | 6 +- .../asterix/api/http/server/ApiServlet.java | 8 +- .../api/http/server/NCQueryServiceServlet.java | 94 +++++++++++ .../api/http/server/QueryResultApiServlet.java | 10 +- .../api/http/server/QueryServiceServlet.java | 115 +++++++------- .../api/http/server/QueryStatusApiServlet.java | 4 +- .../asterix/api/http/server/RestApiServlet.java | 23 +-- .../asterix/api/http/server/ResultUtil.java | 55 +++++-- .../api/http/servlet/ServletConstants.java | 3 +- .../asterix/api/java/AsterixJavaClient.java | 8 +- .../asterix/app/cc/CCExtensionManager.java | 12 +- .../message/ExecuteStatementRequestMessage.java | 157 +++++++++++++++++++ .../ExecuteStatementResponseMessage.java | 86 ++++++++++ .../asterix/app/result/ResultPrinter.java | 37 +++-- .../DefaultStatementExecutorFactory.java | 6 +- .../asterix/app/translator/QueryTranslator.java | 72 +++++---- .../hyracks/bootstrap/CCApplication.java | 42 +++-- .../hyracks/bootstrap/NCApplication.java | 26 ++- .../asterix/messaging/NCMessageBroker.java | 29 ++++ .../apache/asterix/utils/FeedOperations.java | 10 +- .../aql/translator/QueryTranslatorTest.java | 6 +- .../asterix/test/common/TestExecutor.java | 40 +++-- .../async-deferred/AsyncDeferredQueries.xml | 52 ++++++ .../resources/runtimets/testsuite_sqlpp.xml | 37 +---- .../common/dataflow/ICcApplicationContext.java | 20 +++ .../common/messaging/api/INCMessageBroker.java | 15 +- .../common/messaging/api/MessageFuture.java | 40 +++++ asterixdb/asterix-external-data/pom.xml | 4 + .../installer/test/AsterixLifecycleIT.java | 1 + .../runtime/utils/CcApplicationContext.java | 12 +- .../src/main/assembly/filter.properties | 3 +- .../src/main/opt/local/conf/cc.conf | 1 + .../testframework/xml/TestSuiteParser.java | 17 +- asterixdb/asterix-yarn/pom.xml | 6 - .../apache/hyracks/http/server/HttpServer.java | 14 +- .../hyracks/http/server/HttpServerHandler.java | 24 ++- .../http/server/HttpServerInitializer.java | 2 +- 42 files changed, 1008 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 92c487b..19f0dcc 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -18,17 +18,24 @@ */ package org.apache.asterix.translator; +import java.io.Serializable; import java.rmi.RemoteException; +import java.util.ArrayList; +import java.util.List; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.lang.common.statement.Query; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; +import org.apache.commons.lang3.tuple.Triple; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; /** @@ -54,6 +61,16 @@ public interface IStatementExecutor { ASYNC } + class ResultMetadata implements Serializable { + private static final long serialVersionUID = 1L; + + private final List> resultSets = new ArrayList<>(); + + public List> getResultSets() { + return resultSets; + } + } + public static class Stats { private long count; private long size; @@ -85,12 +102,14 @@ public interface IStatementExecutor { * A Hyracks dataset client object that is used to read the results. * @param resultDelivery * The {@code ResultDelivery} kind required for queries in the list of statements + * @param outMetadata + * a reference to write the metadata of executed queries * @param stats * a reference to write the stats of executed queries * @throws Exception */ void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - Stats stats) throws Exception; + ResultMetadata outMetadata, Stats stats) throws Exception; /** * Compiles and execute a list of statements, with passing in client context id and context. @@ -101,6 +120,8 @@ public interface IStatementExecutor { * A Hyracks dataset client object that is used to read the results. * @param resultDelivery * The {@code ResultDelivery} kind required for queries in the list of statements + * @param outMetadata + * a reference to write the metadata of executed queries * @param stats * a reference to write the stats of executed queries * @param clientContextId @@ -110,7 +131,8 @@ public interface IStatementExecutor { * @throws Exception */ void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, - Stats stats, String clientContextId, IStatementExecutorContext ctx) throws Exception; + ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx) + throws Exception; /** * rewrites and compiles query into a hyracks job specifications http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java index 23365de..b244c0c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java @@ -37,14 +37,14 @@ public interface IStatementExecutorFactory { * * @param statements * Statements to execute - * @param conf - * request configuration + * @param output + * output and request configuration * @param compilationProvider * provides query language related components * @param storageComponentProvider * provides storage related components * @return an implementation of {@code IStatementExecutor} thaxt is used to execute the passed list of statements */ - IStatementExecutor create(ICcApplicationContext appCtx, List statements, SessionConfig conf, + IStatementExecutor create(ICcApplicationContext appCtx, List statements, SessionOutput output, ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java index a637e2f..cfd4e87 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java @@ -18,13 +18,10 @@ */ package org.apache.asterix.translator; -import java.io.PrintWriter; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; - /** * SessionConfig captures several different parameters for controlling * the execution of an APIFramework call. @@ -38,8 +35,10 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendab * execution output - LOSSLESS_JSON, CSV, etc. *
  • It allows you to specify output format-specific parameters. */ +public class SessionConfig implements Serializable { + + private static final long serialVersionUID = 1L; -public class SessionConfig { /** * Used to specify the output format for the primary execution. */ @@ -105,53 +104,25 @@ public class SessionConfig { */ public static final String FORMAT_QUOTE_RECORD = "quote-record"; - @FunctionalInterface - public interface ResultDecorator { - AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException; - } - - @FunctionalInterface - public interface ResultAppender { - AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException; - } + // Output format. + private final OutputFormat fmt; // Standard execution flags. private final boolean executeQuery; private final boolean generateJobSpec; private final boolean optimize; - // Output path for primary execution. - private final PrintWriter out; - - // Output format. - private final OutputFormat fmt; - - private final ResultDecorator preResultDecorator; - private final ResultDecorator postResultDecorator; - private final ResultAppender handleAppender; - private final ResultAppender statusAppender; - // Flags. private final Map flags; - public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator, - ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) { - this(out, fmt, preResultDecorator, postResultDecorator, handleAppender, statusAppender, - true, true, true); - } - - public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery, - boolean generateJobSpec) { - this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec); + public SessionConfig(OutputFormat fmt) { + this(fmt, true, true, true); } /** * Create a SessionConfig object with all optional values set to defaults: * - All format flags set to "false". * - All out-of-band outputs set to "false". - * - * @param out - * PrintWriter for execution output. * @param fmt * Output format for execution output. * @param optimize @@ -160,17 +131,9 @@ public class SessionConfig { * Whether to execute the query or not. * @param generateJobSpec * Whether to generate the Hyracks job specification (if - * false, job cannot be executed). */ - public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator, - ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender, - boolean optimize, boolean executeQuery, boolean generateJobSpec) { - this.out = out; + public SessionConfig(OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) { this.fmt = fmt; - this.preResultDecorator = preResultDecorator; - this.postResultDecorator = postResultDecorator; - this.handleAppender = handleAppender; - this.statusAppender = statusAppender; this.optimize = optimize; this.executeQuery = executeQuery; this.generateJobSpec = generateJobSpec; @@ -178,35 +141,12 @@ public class SessionConfig { } /** - * Retrieve the PrintWriter to produce output to. - */ - public PrintWriter out() { - return this.out; - } - - /** * Retrieve the OutputFormat for this execution. */ public OutputFormat fmt() { return this.fmt; } - public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException { - return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app; - } - - public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException { - return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app; - } - - public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException { - return this.handleAppender != null ? this.handleAppender.append(app, handle) : app; - } - - public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException { - return this.statusAppender != null ? this.statusAppender.append(app, status) : app; - } - /** * Retrieve the value of the "execute query" flag. */ http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java new file mode 100644 index 0000000..b559df8 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.translator; + +import java.io.PrintWriter; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; + +public class SessionOutput { + private final SessionConfig config; + + // Output path for primary execution. + private final PrintWriter out; + + private final SessionOutput.ResultDecorator preResultDecorator; + private final SessionOutput.ResultDecorator postResultDecorator; + private final SessionOutput.ResultAppender handleAppender; + private final SessionOutput.ResultAppender statusAppender; + + public SessionOutput(SessionConfig config, PrintWriter out) { + this(config, out, null, null, null, null); + } + + public SessionOutput(SessionConfig config, PrintWriter out, ResultDecorator preResultDecorator, + ResultDecorator postResultDecorator, ResultAppender handleAppender, ResultAppender statusAppender) { + this.config = config; + this.out = out; + this.preResultDecorator = preResultDecorator; + this.postResultDecorator = postResultDecorator; + this.handleAppender = handleAppender; + this.statusAppender = statusAppender; + } + + /** + * Retrieve the PrintWriter to produce output to. + */ + public PrintWriter out() { + return this.out; + } + + public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException { + return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app; + } + + public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException { + return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app; + } + + public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String handle) throws AlgebricksException { + return this.handleAppender != null ? this.handleAppender.append(app, handle) : app; + } + + public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String status) throws AlgebricksException { + return this.statusAppender != null ? this.statusAppender.append(app, status) : app; + } + + public SessionConfig config() { + return config; + } + + @FunctionalInterface + public interface ResultDecorator { + AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException; + } + + @FunctionalInterface + public interface ResultAppender { + AlgebricksAppendable append(AlgebricksAppendable app, String str) throws AlgebricksException; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index b12935d..583302b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -67,6 +67,7 @@ import org.apache.asterix.transaction.management.service.transaction.JobIdFactor import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.asterix.utils.ResourceUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -152,31 +153,33 @@ public class APIFramework { } } - private void printPlanPrefix(SessionConfig conf, String planName) { - if (conf.is(SessionConfig.FORMAT_HTML)) { - conf.out().println("

    " + planName + ":

    "); - conf.out().println("
    ");
    +    private void printPlanPrefix(SessionOutput output, String planName) {
    +        if (output.config().is(SessionConfig.FORMAT_HTML)) {
    +            output.out().println("

    " + planName + ":

    "); + output.out().println("
    ");
             } else {
    -            conf.out().println("----------" + planName + ":");
    +            output.out().println("----------" + planName + ":");
             }
         }
     
    -    private void printPlanPostfix(SessionConfig conf) {
    -        if (conf.is(SessionConfig.FORMAT_HTML)) {
    -            conf.out().println("
    "); + private void printPlanPostfix(SessionOutput output) { + if (output.config().is(SessionConfig.FORMAT_HTML)) { + output.out().println("
    "); } } public Pair reWriteQuery(List declaredFunctions, - MetadataProvider metadataProvider, IReturningStatement q, SessionConfig conf) throws CompilationException { + MetadataProvider metadataProvider, IReturningStatement q, SessionOutput output) + throws CompilationException { if (q == null) { return null; } + SessionConfig conf = output.config(); if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) { - conf.out().println(); - printPlanPrefix(conf, "Expression tree"); - q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0); - printPlanPostfix(conf); + output.out().println(); + printPlanPrefix(output, "Expression tree"); + q.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0); + printPlanPostfix(output); } IQueryRewriter rw = rewriterFactory.createQueryRewriter(); rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter())); @@ -184,17 +187,18 @@ public class APIFramework { } public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, - Query rwQ, int varCounter, String outputDatasetName, SessionConfig conf, ICompiledDmlStatement statement) + Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement) throws AlgebricksException, RemoteException, ACIDException { + SessionConfig conf = output.config(); if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) { - conf.out().println(); + output.out().println(); - printPlanPrefix(conf, "Rewritten expression tree"); + printPlanPrefix(output, "Rewritten expression tree"); if (rwQ != null) { - rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0); + rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0); } - printPlanPostfix(conf); + printPlanPostfix(output); } org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId(); @@ -211,14 +215,14 @@ public class APIFramework { } if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) { - conf.out().println(); + output.out().println(); - printPlanPrefix(conf, "Logical plan"); + printPlanPrefix(output, "Logical plan"); if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) { - LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out()); + LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(output.out()); PlanPrettyPrinter.printPlan(plan, pvisitor, 0); } - printPlanPostfix(conf); + printPlanPostfix(output); } CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties(); int frameSize = compilerProperties.getFrameSize(); @@ -264,15 +268,16 @@ public class APIFramework { if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) { if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) { // For Optimizer tests. - AlgebricksAppendable buffer = new AlgebricksAppendable(conf.out()); + AlgebricksAppendable buffer = new AlgebricksAppendable(output.out()); PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0); } else { - printPlanPrefix(conf, "Optimized logical plan"); + printPlanPrefix(output, "Optimized logical plan"); if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) { - LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out()); + LogicalOperatorPrettyPrintVisitor pvisitor = + new LogicalOperatorPrettyPrintVisitor(output.out()); PlanPrettyPrinter.printPlan(plan, pvisitor, 0); } - printPlanPostfix(conf); + printPlanPostfix(output); } } } @@ -280,7 +285,7 @@ public class APIFramework { try { LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(); PlanPrettyPrinter.printPlan(plan, pvisitor, 0); - ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), conf, + ResultUtil.printResults(metadataProvider.getApplicationContext(), pvisitor.get().toString(), output, new Stats(), null); return null; } catch (IOException e) { @@ -336,17 +341,17 @@ public class APIFramework { } if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) { - printPlanPrefix(conf, "Hyracks job"); + printPlanPrefix(output, "Hyracks job"); if (rwQ != null) { try { - conf.out().println( + output.out().println( new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON())); } catch (IOException e) { throw new AlgebricksException(e); } - conf.out().println(spec.getUserConstraints()); + output.out().println(spec.getUserConstraints()); } - printPlanPostfix(conf); + printPlanPostfix(output); } return spec; } @@ -459,9 +464,7 @@ public class APIFramework { // Gets the frame limit. private static int getFrameLimit(String parameterName, String parameter, long memBudgetInConfiguration, - int frameSize, - int minFrameLimit) - throws AlgebricksException { + int frameSize, int minFrameLimit) throws AlgebricksException { IOptionType longBytePropertyInterpreter = OptionTypes.LONG_BYTE_UNIT; long memBudget = parameter == null ? memBudgetInConfiguration : longBytePropertyInterpreter.parse(parameter); int frameLimit = (int) (memBudget / frameSize); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index a9c4b39..a4e72f7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -26,7 +26,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.app.result.ResultReader; -import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -35,7 +35,7 @@ import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.http.server.AbstractServlet; public class AbstractQueryApiServlet extends AbstractServlet { - protected final ICcApplicationContext appCtx; + protected final IApplicationContext appCtx; public enum ResultFields { REQUEST_ID("requestID"), @@ -93,7 +93,7 @@ public class AbstractQueryApiServlet extends AbstractServlet { } } - AbstractQueryApiServlet(ICcApplicationContext appCtx, ConcurrentMap ctx, String[] paths) { + AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap ctx, String[] paths) { super(ctx, paths); this.appCtx = appCtx; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 3384332..7874aa3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -50,6 +50,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; @@ -137,19 +138,20 @@ public class ApiServlet extends AbstractServlet { } IParser parser = parserFactory.createParser(query); List aqlStatements = parser.parse(); - SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true); + SessionConfig sessionConfig = new SessionConfig(format, true, isSet(executeQuery), true); sessionConfig.set(SessionConfig.FORMAT_HTML, true); sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader); sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray)); sessionConfig.setOOBData(isSet(printExprParam), isSet(printRewrittenExprParam), isSet(printLogicalPlanParam), isSet(printOptimizedLogicalPlanParam), isSet(printJob)); + SessionOutput sessionOutput = new SessionOutput(sessionConfig, out); MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionConfig, + IStatementExecutor translator = statementExectorFactory.create(appCtx, aqlStatements, sessionOutput, compilationProvider, componentProvider); double duration; long startTime = System.currentTimeMillis(); translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE, - new IStatementExecutor.Stats()); + null, new IStatementExecutor.Stats()); long endTime = System.currentTimeMillis(); duration = (endTime - startTime) / 1000.00; out.println(HTML_STATEMENT_SEPARATOR); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java new file mode 100644 index 0000000..2b70685 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.api.http.server; + +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.app.message.ExecuteStatementRequestMessage; +import org.apache.asterix.app.message.ExecuteStatementResponseMessage; +import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.messaging.api.INCMessageBroker; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.SessionOutput; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobId; + +/** + * Query service servlet that can run on NC nodes. + * Delegates query execution to CC, then serves the result. + */ +public class NCQueryServiceServlet extends QueryServiceServlet { + public NCQueryServiceServlet(ConcurrentMap ctx, String[] paths, IApplicationContext appCtx, + ILangExtension.Language queryLanguage) { + super(ctx, paths, appCtx, queryLanguage, null, null, null); + } + + @Override + protected void executeStatement(String statementsText, SessionOutput sessionOutput, + IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param, + String handleUrl, long[] outExecStartEnd) throws Exception { + // Running on NC -> send 'execute' message to CC + INCServiceContext ncCtx = (INCServiceContext) serviceCtx; + INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker(); + IStatementExecutor.ResultDelivery ccDelivery = delivery == IStatementExecutor.ResultDelivery.IMMEDIATE + ? IStatementExecutor.ResultDelivery.DEFERRED : delivery; + ExecuteStatementResponseMessage responseMsg; + MessageFuture responseFuture = ncMb.registerMessageFuture(); + try { + ExecuteStatementRequestMessage requestMsg = + new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, + statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl); + outExecStartEnd[0] = System.nanoTime(); + ncMb.sendMessageToCC(requestMsg); + responseMsg = (ExecuteStatementResponseMessage) responseFuture.get( + ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS); + outExecStartEnd[1] = System.nanoTime(); + } finally { + ncMb.deregisterMessageFuture(responseFuture.getFutureId()); + } + + Throwable err = responseMsg.getError(); + if (err != null) { + if (err instanceof Error) { + throw (Error) err; + } else if (err instanceof Exception) { + throw (Exception) err; + } else { + throw new Exception(err.toString(), err); + } + } + + IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata(); + if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) { + for (Triple rsmd : resultMetadata.getResultSets()) { + ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle()); + ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight()); + } + } else { + sessionOutput.out().append(responseMsg.getResult()); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 401f55e..42e23ba 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -25,9 +25,9 @@ import java.util.logging.Logger; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; -import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.translator.IStatementExecutor.Stats; -import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.dataset.DatasetJobRecord; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.exceptions.ErrorCode; @@ -41,7 +41,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryResultApiServlet extends AbstractQueryApiServlet { private static final Logger LOGGER = Logger.getLogger(QueryResultApiServlet.class.getName()); - public QueryResultApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { + public QueryResultApiServlet(ConcurrentMap ctx, String[] paths, IApplicationContext appCtx) { super(appCtx, ctx, paths); } @@ -94,8 +94,8 @@ public class QueryResultApiServlet extends AbstractQueryApiServlet { // way to send the same OutputFormat value here as was // originally determined there. Need to save this value on // some object that we can obtain here. - SessionConfig sessionConfig = RestApiServlet.initResponse(request, response); - ResultUtil.printResults(appCtx, resultReader, sessionConfig, new Stats(), null); + SessionOutput sessionOutput = RestApiServlet.initResponse(request, response); + ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null); } catch (HyracksDataException e) { final int errorCode = e.getErrorCode(); if (ErrorCode.NO_RESULTSET == errorCode) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 20bffc4..c45f24a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -27,8 +27,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.api.http.ctx.StatementExecutorContext; +import org.apache.asterix.algebra.base.ILangExtension; import org.apache.asterix.api.http.servlet.ServletConstants; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; @@ -46,8 +47,8 @@ import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; +import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.utils.HttpUtil; @@ -66,19 +67,23 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryServiceServlet extends AbstractQueryApiServlet { private static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName()); + protected final ILangExtension.Language queryLanguage; private final ILangCompilationProvider compilationProvider; private final IStatementExecutorFactory statementExecutorFactory; private final IStorageComponentProvider componentProvider; - private final IStatementExecutorContext queryCtx = new StatementExecutorContext(); + private final IStatementExecutorContext queryCtx; + protected final IServiceContext serviceCtx; - public QueryServiceServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx, - ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory, - IStorageComponentProvider componentProvider) { + public QueryServiceServlet(ConcurrentMap ctx, String[] paths, IApplicationContext appCtx, + ILangExtension.Language queryLanguage, ILangCompilationProvider compilationProvider, + IStatementExecutorFactory statementExecutorFactory, IStorageComponentProvider componentProvider) { super(appCtx, ctx, paths); + this.queryLanguage = queryLanguage; this.compilationProvider = compilationProvider; this.statementExecutorFactory = statementExecutorFactory; this.componentProvider = componentProvider; - ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx); + this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR); + this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR); } @Override @@ -235,40 +240,22 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { return SessionConfig.OutputFormat.CLEAN_JSON; } - private static SessionConfig createSessionConfig(RequestParameters param, String handleUrl, + private static SessionOutput createSessionOutput(RequestParameters param, String handleUrl, PrintWriter resultWriter) { - SessionConfig.ResultDecorator resultPrefix = new SessionConfig.ResultDecorator() { - int resultNo = -1; - - @Override - public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException { - app.append("\t\""); - app.append(ResultFields.RESULTS.str()); - if (resultNo >= 0) { - app.append('-').append(String.valueOf(resultNo)); - } - ++resultNo; - app.append("\": "); - return app; - } - }; - - SessionConfig.ResultDecorator resultPostfix = app -> app.append("\t,\n"); - SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("\t\"") - .append(ResultFields.HANDLE.str()).append("\": \"").append(handleUrl).append(handle).append("\",\n"); - SessionConfig.ResultAppender appendStatus = (app, status) -> app.append("\t\"") - .append(ResultFields.STATUS.str()).append("\": \"").append(status).append("\",\n"); + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); SessionConfig.OutputFormat format = getFormat(param.format); - SessionConfig sessionConfig = - new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, appendHandle, appendStatus); + SessionConfig sessionConfig = new SessionConfig(format); sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true); sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty); sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD, format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON); sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV && "present".equals(getParameterValue(param.format, Attribute.HEADER.str()))); - return sessionConfig; + return new SessionOutput(sessionConfig, resultWriter, resultPrefix, resultPostfix, appendHandle, appendStatus); } private static void printClientContextID(PrintWriter pw, RequestParameters params) { @@ -406,13 +393,13 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { ResultDelivery delivery = parseResultDelivery(param.mode); String handleUrl = getHandleUrl(param.host, param.path, delivery); - SessionConfig sessionConfig = createSessionConfig(param, handleUrl, resultWriter); + SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter); + SessionConfig sessionConfig = sessionOutput.config(); HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); HttpResponseStatus status = HttpResponseStatus.OK; Stats stats = new Stats(); - long execStart = -1; - long execEnd = -1; + long[] execStartEnd = new long[] { -1, -1 }; resultWriter.print("{\n"); printRequestId(resultWriter); @@ -420,46 +407,33 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { printSignature(resultWriter); printType(resultWriter, sessionConfig); try { - final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); - if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { - // using a plain IllegalStateException here to get into the right catch clause for a 500 - throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); - } if (param.statement == null || param.statement.isEmpty()) { throw new AsterixException("Empty request, no statement provided"); } - IParser parser = compilationProvider.getParserFactory().createParser(param.statement + ";"); - List statements = parser.parse(); - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = - statementExecutorFactory.create(appCtx, statements, sessionConfig, compilationProvider, - componentProvider); - execStart = System.nanoTime(); - translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, stats, - param.clientContextID, queryCtx); - execEnd = System.nanoTime(); + String statementsText = param.statement + ";"; + executeStatement(statementsText, sessionOutput, delivery, stats, param, handleUrl, execStartEnd); if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) { - ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS); + ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS); } } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); ResultUtil.printError(resultWriter, pe); - ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL); + ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); status = HttpResponseStatus.BAD_REQUEST; } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.toString(), e); ResultUtil.printError(resultWriter, e); - ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL); + ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL); status = HttpResponseStatus.INTERNAL_SERVER_ERROR; } finally { - if (execStart == -1) { - execEnd = -1; - } else if (execEnd == -1) { - execEnd = System.nanoTime(); + if (execStartEnd[0] == -1) { + execStartEnd[1] = -1; + } else if (execStartEnd[1] == -1) { + execStartEnd[1] = System.nanoTime(); } } - printMetrics(resultWriter, System.nanoTime() - elapsedStart, execEnd - execStart, stats.getCount(), - stats.getSize()); + printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], + stats.getCount(), stats.getSize()); resultWriter.print("}\n"); resultWriter.flush(); String result = stringWriter.toString(); @@ -472,4 +446,23 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { LOGGER.warning("Error flushing output writer"); } } + + protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery, + IStatementExecutor.Stats stats, RequestParameters param, String handleUrl, long[] outExecStartEnd) + throws Exception { + IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { + // using a plain IllegalStateException here to get into the right catch clause for a 500 + throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); + } + IParser parser = compilationProvider.getParserFactory().createParser(statementsText); + List statements = parser.parse(); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = statementExecutorFactory.create((ICcApplicationContext) appCtx, statements, + sessionOutput, compilationProvider, componentProvider); + outExecStartEnd[0] = System.nanoTime(); + translator.compileAndExecute(getHyracksClientConnection(), getHyracksDataset(), delivery, null, stats, + param.clientContextID, queryCtx); + outExecStartEnd[1] = System.nanoTime(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java index d0c574e..71dddc0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java @@ -29,7 +29,7 @@ import java.util.logging.Logger; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; -import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.hyracks.api.dataset.DatasetJobRecord; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.http.api.IServletRequest; @@ -41,7 +41,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; public class QueryStatusApiServlet extends AbstractQueryApiServlet { private static final Logger LOGGER = Logger.getLogger(QueryStatusApiServlet.class.getName()); - public QueryStatusApiServlet(ConcurrentMap ctx, String[] paths, ICcApplicationContext appCtx) { + public QueryStatusApiServlet(ConcurrentMap ctx, String[] paths, IApplicationContext appCtx) { super(appCtx, ctx, paths); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index e339ba9..6b1e408 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -45,6 +45,7 @@ import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; @@ -84,7 +85,7 @@ public abstract class RestApiServlet extends AbstractServlet { * SessionConfig with the appropriate output writer and output-format * based on the Accept: header and other servlet parameters. */ - static SessionConfig initResponse(IServletRequest request, IServletResponse response) throws IOException { + static SessionOutput initResponse(IServletRequest request, IServletResponse response) throws IOException { HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, HttpUtil.Encoding.UTF8); // CLEAN_JSON output is the default; most generally useful for a // programmatic HTTP API @@ -114,9 +115,9 @@ public abstract class RestApiServlet extends AbstractServlet { format = OutputFormat.LOSSLESS_JSON; } - SessionConfig.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle") + SessionOutput.ResultAppender appendHandle = (app, handle) -> app.append("{ \"").append("handle") .append("\":" + " \"").append(handle).append("\" }"); - SessionConfig sessionConfig = new SessionConfig(response.writer(), format, null, null, appendHandle, null); + SessionConfig sessionConfig = new SessionConfig(format); // If it's JSON or ADM, check for the "wrapper-array" flag. Default is // "true" for JSON and "false" for ADM. (Not applicable for CSV.) @@ -152,7 +153,7 @@ public abstract class RestApiServlet extends AbstractServlet { default: throw new IOException("Unknown format " + format); } - return sessionConfig; + return new SessionOutput(sessionConfig, response.writer(), null, null, appendHandle, null); } @Override @@ -171,9 +172,9 @@ public abstract class RestApiServlet extends AbstractServlet { // enable cross-origin resource sharing response.setHeader("Access-Control-Allow-Origin", "*"); response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept"); - SessionConfig sessionConfig = initResponse(request, response); + SessionOutput sessionOutput = initResponse(request, response); QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request); - doHandle(response, query, sessionConfig, resultDelivery); + doHandle(response, query, sessionOutput, resultDelivery); } catch (Exception e) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); LOGGER.log(Level.WARNING, "Failure handling request", e); @@ -181,7 +182,7 @@ public abstract class RestApiServlet extends AbstractServlet { } } - private void doHandle(IServletResponse response, String query, SessionConfig sessionConfig, + private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput, ResultDelivery resultDelivery) throws JsonProcessingException { try { response.setStatus(HttpResponseStatus.OK); @@ -201,20 +202,20 @@ public abstract class RestApiServlet extends AbstractServlet { List aqlStatements = parser.parse(); validate(aqlStatements); MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig, + IStatementExecutor translator = statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput, compilationProvider, componentProvider); - translator.compileAndExecute(hcc, hds, resultDelivery, new IStatementExecutor.Stats()); + translator.compileAndExecute(hcc, hds, resultDelivery, null, new IStatementExecutor.Stats()); } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); String errorMessage = ResultUtil.buildParseExceptionMessage(pe, query); ObjectNode errorResp = ResultUtil.getErrorResponse(2, errorMessage, "", ResultUtil.extractFullStackTrace(pe)); - sessionConfig.out().write(new ObjectMapper().writeValueAsString(errorResp)); + sessionOutput.out().write(new ObjectMapper().writeValueAsString(errorResp)); } catch (Exception e) { GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - ResultUtil.apiErrorHandler(sessionConfig.out(), e); + ResultUtil.apiErrorHandler(sessionOutput.out(), e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java index fe6fa89..3bcd670 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java @@ -36,10 +36,10 @@ import java.util.stream.Stream; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultPrinter; import org.apache.asterix.app.result.ResultReader; -import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor.Stats; -import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.http.ParseException; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; @@ -77,29 +77,29 @@ public class ResultUtil { return escaped; } - public static void printResults(ICcApplicationContext appCtx, ResultReader resultReader, SessionConfig conf, + public static void printResults(IApplicationContext appCtx, ResultReader resultReader, SessionOutput output, Stats stats, ARecordType recordType) throws HyracksDataException { - new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader); + new ResultPrinter(appCtx, output, stats, recordType).print(resultReader); } - public static void printResults(ICcApplicationContext appCtx, String record, SessionConfig conf, Stats stats, + public static void printResults(IApplicationContext appCtx, String record, SessionOutput output, Stats stats, ARecordType recordType) throws HyracksDataException { - new ResultPrinter(appCtx, conf, stats, recordType).print(record); + new ResultPrinter(appCtx, output, stats, recordType).print(record); } - public static void printResultHandle(SessionConfig conf, ResultHandle handle) throws HyracksDataException { + public static void printResultHandle(SessionOutput output, ResultHandle handle) throws HyracksDataException { try { - final AlgebricksAppendable app = new AlgebricksAppendable(conf.out()); - conf.appendHandle(app, handle.toString()); + final AlgebricksAppendable app = new AlgebricksAppendable(output.out()); + output.appendHandle(app, handle.toString()); } catch (AlgebricksException e) { LOGGER.warn("error printing handle", e); } } - public static void printStatus(SessionConfig conf, AbstractQueryApiServlet.ResultStatus rs) { + public static void printStatus(SessionOutput output, AbstractQueryApiServlet.ResultStatus rs) { try { - final AlgebricksAppendable app = new AlgebricksAppendable(conf.out()); - conf.appendStatus(app, rs.str()); + final AlgebricksAppendable app = new AlgebricksAppendable(output.out()); + output.appendStatus(app, rs.str()); } catch (AlgebricksException e) { LOGGER.warn("error printing status", e); } @@ -318,4 +318,35 @@ public class ResultUtil { return errorTemplate; } + public static SessionOutput.ResultDecorator createPreResultDecorator() { + return new SessionOutput.ResultDecorator() { + int resultNo = -1; + + @Override + public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException { + app.append("\t\""); + app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str()); + if (resultNo >= 0) { + app.append('-').append(String.valueOf(resultNo)); + } + ++resultNo; + app.append("\": "); + return app; + } + }; + } + + public static SessionOutput.ResultDecorator createPostResultDecorator() { + return app -> app.append("\t,\n"); + } + + public static SessionOutput.ResultAppender createResultHandleAppender(String handleUrl) { + return (app, handle) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str()) + .append("\": \"").append(handleUrl).append(handle).append("\",\n"); + } + + public static SessionOutput.ResultAppender createResultStatusAppender() { + return (app, status) -> app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str()) + .append("\": \"").append(status).append("\",\n"); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java index a9d4e22..b815d76 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java @@ -22,8 +22,9 @@ public class ServletConstants { public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION"; public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET"; public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO"; - public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE_ATTR"; + public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE"; public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES"; + public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT"; private ServletConstants() { } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index ecf2c53..a9d24b9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -36,6 +36,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobSpecification; @@ -99,16 +100,17 @@ public class AsterixJavaClient { List statements = parser.parse(); MetadataManager.INSTANCE.init(); - SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, optimize, true, generateBinaryRuntime); + SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, true, generateBinaryRuntime); conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, printOptimizedPlan, printJob); if (printPhysicalOpsOnly) { conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true); } + SessionOutput output = new SessionOutput(conf, writer); - IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, conf, compilationProvider, + IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider, storageComponentProvider); translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE, - new IStatementExecutor.Stats()); + null, new IStatementExecutor.Stats()); writer.flush(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java index 6c6f2af..0c6b2cc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java @@ -110,11 +110,11 @@ public class CCExtensionManager implements IAlgebraExtensionManager { return statementExecutorFactory; } - public ILangCompilationProvider getAqlCompilationProvider() { - return aqlCompilationProvider; - } - - public ILangCompilationProvider getSqlppCompilationProvider() { - return sqlppCompilationProvider; + public ILangCompilationProvider getCompilationProvider(Language lang) { + switch (lang) { + case AQL: return aqlCompilationProvider; + case SQLPP: return sqlppCompilationProvider; + default: throw new IllegalArgumentException(String.valueOf(lang)); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java new file mode 100644 index 0000000..defa180 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.message; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.api.http.server.ResultUtil; +import org.apache.asterix.app.cc.CCExtensionManager; +import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.context.IStorageComponentProvider; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.compiler.provider.ILangCompilationProvider; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.lang.aql.parser.TokenMgrError; +import org.apache.asterix.lang.common.base.IParser; +import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.runtime.utils.ClusterStateManager; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.asterix.translator.IStatementExecutorFactory; +import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.cc.ClusterControllerService; + +public final class ExecuteStatementRequestMessage implements ICcAddressedMessage { + private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName()); + + private final String requestNodeId; + + private final long requestMessageId; + + private final ILangExtension.Language lang; + + private final String statementsText; + + private final SessionConfig sessionConfig; + + private final IStatementExecutor.ResultDelivery delivery; + + private final String clientContextID; + + private final String handleUrl; + + public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, + String statementsText, SessionConfig sessionConfig, IStatementExecutor.ResultDelivery delivery, + String clientContextID, String handleUrl) { + this.requestNodeId = requestNodeId; + this.requestMessageId = requestMessageId; + this.lang = lang; + this.statementsText = statementsText; + this.sessionConfig = sessionConfig; + this.delivery = delivery; + this.clientContextID = clientContextID; + this.handleUrl = handleUrl; + } + + @Override + public void handle(ICcApplicationContext ccAppCtx) throws HyracksDataException, InterruptedException { + ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext(); + ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService(); + CCApplication ccApp = (CCApplication) ccSrv.getApplication(); + CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker(); + CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager(); + ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang); + IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); + IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); + IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext(); + + ccSrv.getExecutor().submit(() -> { + ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + + try { + final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState(); + if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { + throw new IllegalStateException("Cannot execute request, cluster is " + clusterState); + } + + IParser parser = compilationProvider.getParserFactory().createParser(statementsText); + List statements = parser.parse(); + + StringWriter outWriter = new StringWriter(256); + PrintWriter outPrinter = new PrintWriter(outWriter); + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); + SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, + appendHandle, appendStatus); + + IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); + + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, + compilationProvider, storageComponentProvider); + translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, + new IStatementExecutor.Stats(), clientContextID, statementExecutorContext); + + outPrinter.close(); + responseMsg.setResult(outWriter.toString()); + responseMsg.setMetadata(outMetadata); + } catch (TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); + responseMsg.setError(pe); + } catch (AsterixException pe) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe); + responseMsg.setError(new AsterixException(pe.getMessage())); + } catch (Exception e) { + String estr = e.toString(); + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, estr, e); + responseMsg.setError(new Exception(estr)); + } + + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.toString(), e); + } + }); + } + + @Override + public String toString() { + return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId, + statementsText); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java new file mode 100644 index 0000000..4f9aa0c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.app.message; + +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public final class ExecuteStatementResponseMessage implements INcAddressedMessage { + private static final long serialVersionUID = 1L; + + public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); + + private final long requestMessageId; + + private String result; + + private IStatementExecutor.ResultMetadata metadata; + + private Throwable error; + + public ExecuteStatementResponseMessage(long requestMessageId) { + this.requestMessageId = requestMessageId; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(requestMessageId); + if (future != null) { + future.complete(this); + } + } + + public Throwable getError() { + return error; + } + + public void setError(Throwable error) { + this.error = error; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + + public IStatementExecutor.ResultMetadata getMetadata() { + return metadata; + } + + public void setMetadata(IStatementExecutor.ResultMetadata metadata) { + this.metadata = metadata; + } + + @Override + public String toString() { + return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId, + result != null ? result.length() : 0); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4671f712/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java index 7ed3aef..452d13e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java @@ -24,10 +24,11 @@ import java.io.IOException; import java.io.StringWriter; import java.nio.ByteBuffer; -import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; import org.apache.hyracks.api.comm.IFrame; @@ -47,6 +48,7 @@ public class ResultPrinter { private final FrameManager resultDisplayFrameMgr; + private final SessionOutput output; private final SessionConfig conf; private final Stats stats; private final ARecordType recordType; @@ -62,8 +64,9 @@ public class ResultPrinter { private ObjectMapper om; private ObjectWriter ow; - public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf, Stats stats, ARecordType recordType) { - this.conf = conf; + public ResultPrinter(IApplicationContext appCtx, SessionOutput output, Stats stats, ARecordType recordType) { + this.output = output; + this.conf = output.config(); this.stats = stats; this.recordType = recordType; this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON); @@ -112,18 +115,18 @@ public class ResultPrinter { // If we're outputting CSV with a header, the HTML header was already // output by displayCSVHeader(), so skip it here if (conf.is(SessionConfig.FORMAT_HTML)) { - conf.out().println("

    Results:

    "); - conf.out().println("
    ");
    +            output.out().println("

    Results:

    "); + output.out().println("
    ");
             }
     
             try {
    -            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
    +            output.resultPrefix(new AlgebricksAppendable(output.out()));
             } catch (AlgebricksException e) {
                 throw new HyracksDataException(e);
             }
     
             if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
    -            conf.out().print("[ ");
    +            output.out().print("[ ");
                 wrapArray = true;
             }
     
    @@ -134,29 +137,29 @@ public class ResultPrinter {
                 if (quoteRecord) {
                     StringWriter sw = new StringWriter();
                     appendCSVHeader(sw, recordType);
    -                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
    -                conf.out().print("\n");
    +                output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
    +                output.out().print("\n");
                     notFirst = true;
                 } else {
    -                appendCSVHeader(conf.out(), recordType);
    +                appendCSVHeader(output.out(), recordType);
                 }
             }
         }
     
         private void printPostfix() throws HyracksDataException {
    -        conf.out().flush();
    +        output.out().flush();
             if (wrapArray) {
    -            conf.out().println(" ]");
    +            output.out().println(" ]");
             }
             try {
    -            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
    +            output.resultPostfix(new AlgebricksAppendable(output.out()));
             } catch (AlgebricksException e) {
                 throw new HyracksDataException(e);
             }
             if (conf.is(SessionConfig.FORMAT_HTML)) {
    -            conf.out().println("
    "); + output.out().println("
    "); } - conf.out().flush(); + output.out().flush(); } private void displayRecord(String result) throws HyracksDataException { @@ -177,7 +180,7 @@ public class ResultPrinter { // TODO(tillw): this is inefficient as well record = JSONUtil.quoteAndEscape(record); } - conf.out().print(record); + output.out().print(record); stats.setCount(stats.getCount() + 1); // TODO(tillw) fix this approximation stats.setSize(stats.getSize() + record.length()); @@ -211,7 +214,7 @@ public class ResultPrinter { } String result = new String(frameBytes, start, length, UTF_8); if (wrapArray && notFirst) { - conf.out().print(", "); + output.out().print(", "); } notFirst = true; displayRecord(result);