asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Westmann (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: WIP - async result
Date Sat, 17 Dec 2016 08:00:06 GMT
Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1394

Change subject: WIP - async result
......................................................................

WIP - async result

Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
---
M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
8 files changed, 174 insertions(+), 11 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/94/1394/1

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
index 144531e..cac9cd3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/FunctionCollection.java
@@ -204,6 +204,7 @@
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardPrefixDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedCheckDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SimilarityJaccardSortedDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.SleepDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialAreaDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialCellDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.SpatialDistanceDescriptor;
@@ -411,6 +412,9 @@
         temp.add(OrderedListConstructorDescriptor.FACTORY);
         temp.add(UnorderedListConstructorDescriptor.FACTORY);
 
+        // Sleep function
+        temp.add(SleepDescriptor.FACTORY);
+
         // Inject failure function
         temp.add(InjectFailureDescriptor.FACTORY);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
index c994abd..023fd0c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CompilerExtensionManager.java
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.algebra.base.ILangExtension.Language;
@@ -60,12 +61,12 @@
      * @throws ClassNotFoundException
      * @throws HyracksDataException
      */
-    public CompilerExtensionManager(List<AsterixExtension> list)
+    public CompilerExtensionManager(List<AsterixExtension> list, ThreadFactory threadFactory)
             throws InstantiationException, IllegalAccessException, ClassNotFoundException,
HyracksDataException {
         Pair<ExtensionId, ILangCompilationProvider> aqlcp = null;
         Pair<ExtensionId, ILangCompilationProvider> sqlppcp = null;
         IStatementExecutorExtension see = null;
-        defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory();
+        defaultQueryTranslatorFactory = new DefaultStatementExecutorFactory(threadFactory);
 
         if (list != null) {
             for (AsterixExtension extensionConf : list) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index 6cdf329..8204c5f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -19,18 +19,30 @@
 package org.apache.asterix.app.translator;
 
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 
 public class DefaultStatementExecutorFactory implements IStatementExecutorFactory {
+
+    protected final ThreadFactory threadFactory;
+
+    public DefaultStatementExecutorFactory() {
+        this(new HyracksThreadFactory("DefaultStatementExecutorFactory"));
+    }
+
+    public DefaultStatementExecutorFactory(ThreadFactory threadFactory) {
+        this.threadFactory = threadFactory;
+    }
 
     @Override
     public QueryTranslator create(List<Statement> aqlStatements, SessionConfig conf,
             ILangCompilationProvider compilationProvider) {
-        return new QueryTranslator(aqlStatements, conf, compilationProvider);
+        return new QueryTranslator(aqlStatements, conf, compilationProvider, threadFactory);
     }
 
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 643a352..bdf2075 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -37,6 +37,7 @@
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -53,12 +54,12 @@
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -226,15 +227,17 @@
     protected final List<FunctionDecl> declaredFunctions;
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
+    protected ThreadFactory threadFactory;
 
     public QueryTranslator(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider) {
+            ILangCompilationProvider compliationProvider, ThreadFactory threadFactory) {
         this.statements = statements;
         this.sessionConfig = conf;
         this.declaredFunctions = getDeclaredFunctions(statements);
         this.apiFramework = new APIFramework(compliationProvider);
         this.rewriterFactory = compliationProvider.getRewriterFactory();
-        activeDefaultDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+        this.activeDefaultDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+        this.threadFactory = threadFactory;
     }
 
     protected List<FunctionDecl> getDeclaredFunctions(List<Statement> statements)
{
@@ -2552,7 +2555,7 @@
         jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
     }
 
-    protected JobSpecification handleQuery(MetadataProvider metadataProvider, Query query,
+    protected void handleQuery(MetadataProvider metadataProvider, Query query,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
Stats stats)
             throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2567,11 +2570,9 @@
 
             if (query.isExplain()) {
                 sessionConfig.out().flush();
-                return jobSpec;
             } else if (sessionConfig.isExecuteQuery() && jobSpec != null) {
                 handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats);
             }
-            return jobSpec;
         } catch (Exception e) {
             LOGGER.log(Level.INFO, e.getMessage(), e);
             if (bActiveTxn) {
@@ -2596,7 +2597,18 @@
         switch (resultDelivery) {
             case ASYNC:
                 ResultUtil.printResultHandle(new ResultHandle(jobId, metadataProvider.getResultSetId()),
sessionConfig);
-                hcc.waitForCompletion(jobId);
+                if (threadFactory != null) {
+                    threadFactory.newThread(() -> {
+                        try {
+                            hcc.waitForCompletion(jobId);
+                        } catch (Exception e) {
+                            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
+                                    resultDelivery.name() + " job " + "with id " + jobId
+ " failed");
+                        }
+                    });
+                } else {
+                    hcc.waitForCompletion(jobId);
+                }
                 break;
             case IMMEDIATE:
                 hcc.waitForCompletion(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 25a5418..32d90dd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -103,7 +103,7 @@
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
                 libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE);
-        ccExtensionManager = new CompilerExtensionManager(getExtensions());
+        ccExtensionManager = new CompilerExtensionManager(getExtensions(), appCtx.getThreadFactory());
         AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
 
         final CCConfig ccConfig = controllerService.getCCConfig();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 089f804..711ea2f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -91,6 +91,7 @@
 import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
+import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
@@ -647,6 +648,8 @@
             "spatial-cell", 4);
     public static final FunctionIdentifier SWITCH_CASE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "switch-case", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier SLEEP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sleep", 2);
     public static final FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "inject-failure", 2);
     public static final FunctionIdentifier FLOW_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1053,6 +1056,7 @@
         addPrivateFunction(SUBSET_COLLECTION, SubsetCollectionTypeComputer.INSTANCE, true);
         addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true);
         addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true);
+        addFunction(SLEEP, SleepTypeComputer.INSTANCE, false);
         addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE, true);
         addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true);
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
new file mode 100644
index 0000000..7063698
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class SleepTypeComputer extends AbstractResultTypeComputer {
+    public static final SleepTypeComputer INSTANCE = new SleepTypeComputer();
+
+    @Override
+    public void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException
{
+        if (argIndex == 1) {
+            switch (type.getTypeTag()) {
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                    break;
+                default:
+                    throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(),
ATypeTag.INT8, ATypeTag.INT16,
+                            ATypeTag.INT32, ATypeTag.INT64);
+            }
+        }
+    }
+
+    @Override
+    public IAType getResultType(ILogicalExpression expr, IAType... types) throws AlgebricksException
{
+        return types[0];
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
new file mode 100644
index 0000000..663a21d
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IFunctionDescriptorFactory FACTORY = SleepDescriptor::new;
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[]
args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx)
throws HyracksDataException {
+                return new IScalarEvaluator() {
+
+                    private IPointable argTime = new VoidPointable();
+                    private final IScalarEvaluator evalValue = args[0].createScalarEvaluator(ctx);
+                    private final IScalarEvaluator evalTime = args[1].createScalarEvaluator(ctx);
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+                        evalValue.evaluate(tuple, result);
+                        evalTime.evaluate(tuple, argTime);
+
+                        final byte[] bytes = argTime.getByteArray();
+                        final int offset = argTime.getStartOffset();
+                        final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(),
1, bytes, offset);
+
+                        try {
+                            Thread.sleep(time / 1000000, (int) (time % 1000000));
+                        } catch (InterruptedException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SLEEP;
+    }
+
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1394
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iafba65d9c7bd8643c42e5126c8d89164ae328908
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <tillw@apache.org>

Mime
View raw message