asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [2/2] incubator-asterixdb git commit: Fix ASTERIXDB-1194, ASTERIXDB-1195, ASTERIXDB-1196, ASTERIXDB-1197.
Date Wed, 25 Nov 2015 07:20:05 GMT
Fix ASTERIXDB-1194,ASTERIXDB-1195,ASTERIXDB-1196,ASTERIXDB-1197.

Change-Id: I7d167b64bf9ec754182b5b2fe44dfc7e5908c686
Reviewed-on: https://asterix-gerrit.ics.uci.edu/325
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/751315f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/751315f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/751315f8

Branch: refs/heads/master
Commit: 751315f870ed6c297efb96751678eeb620d7335b
Parents: d7585c5
Author: Yingyi Bu <buyingyi@gmail.com>
Authored: Tue Nov 24 22:08:41 2015 -0800
Committer: Yingyi Bu <buyingyi@gmail.com>
Committed: Tue Nov 24 23:16:30 2015 -0800

----------------------------------------------------------------------
 .../common/AsterixHyracksIntegrationUtil.java   |  50 ++-
 .../api/http/servlet/ConnectorAPIServlet.java   |  25 +-
 .../api/http/servlet/UpdateAPIServlet.java      |   5 +-
 .../asterix/aql/translator/QueryTranslator.java | 426 +++++++++----------
 .../org/apache/asterix/drivers/AsterixCLI.java  |   4 +-
 .../apache/asterix/file/DatasetOperations.java  |   6 +-
 .../apache/asterix/file/IndexOperations.java    |   2 +-
 .../file/SecondaryBTreeOperationsHelper.java    |   5 +-
 .../apache/asterix/util/FlushDatasetUtils.java  |  76 ++++
 .../java/org/apache/asterix/util/JobUtils.java  |  50 +++
 .../resources/asterix-build-configuration.xml   |   4 +
 .../http/servlet/ConnectorAPIServletTest.java   |  11 +-
 .../aql/translator/QueryTranslatorTest.java     | 112 +++++
 .../org/apache/asterix/test/dml/DmlTest.java    |   4 +-
 .../asterix/test/metadata/MetadataTest.java     |  23 +-
 .../asterix/test/optimizer/OptimizerTest.java   |  23 +-
 .../asterix/test/runtime/ExecutionTestUtil.java |  30 +-
 .../config/AsterixCompilerProperties.java       |   7 +
 .../resources/conf/asterix-configuration.xml    |   5 +
 .../metadata/utils/MetadataLockManager.java     |  34 +-
 .../apache/asterix/om/types/ARecordType.java    |   3 +
 .../om/util/JSONDeserializerForTypes.java       |  48 +--
 asterix-tools/pom.xml                           |   4 +-
 .../resources/base-asterix-configuration.xml    |   5 +
 .../configs/base-asterix-configuration.xml      |   5 +
 25 files changed, 585 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2897a15..990decc 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,11 +19,15 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.EnumSet;
 
+import org.apache.asterix.common.config.AsterixPropertiesAccessor;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
 import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobFlag;
@@ -36,6 +40,7 @@ import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class AsterixHyracksIntegrationUtil {
 
+    private static final String IO_DIR_KEY = "java.io.tmpdir";
     public static final int NODES = 2;
     public static final int PARTITONS = 2;
 
@@ -47,7 +52,16 @@ public class AsterixHyracksIntegrationUtil {
     public static NodeControllerService[] ncs = new NodeControllerService[NODES];
     public static IHyracksClientConnection hcc;
 
-    public static void init() throws Exception {
+    protected static AsterixTransactionProperties txnProperties;
+
+    public static void init(boolean deleteOldInstanceData) throws Exception {
+        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
+        txnProperties = new AsterixTransactionProperties(apa);
+        if (deleteOldInstanceData) {
+            deleteTransactionLogs();
+            removeTestStorageFiles();
+        }
+
         CCConfig ccConfig = new CCConfig();
         ccConfig.clusterNetIpAddress = "127.0.0.1";
         ccConfig.clientNetIpAddress = "127.0.0.1";
@@ -61,6 +75,7 @@ public class AsterixHyracksIntegrationUtil {
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
+        // Starts ncs.
         int n = 0;
         for (String ncName : getNcNames()) {
             NCConfig ncConfig1 = new NCConfig();
@@ -86,7 +101,6 @@ public class AsterixHyracksIntegrationUtil {
             ncs[n].start();
             ++n;
         }
-
         hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
     }
 
@@ -110,14 +124,20 @@ public class AsterixHyracksIntegrationUtil {
         return hcc;
     }
 
-    public static void deinit() throws Exception {
+    public static void deinit(boolean deleteOldInstanceData) throws Exception {
         for (int n = 0; n < ncs.length; ++n) {
             if (ncs[n] != null)
                 ncs[n].stop();
 
         }
-        if (cc != null)
+        if (cc != null) {
             cc.stop();
+        }
+
+        if (deleteOldInstanceData) {
+            deleteTransactionLogs();
+            removeTestStorageFiles();
+        }
     }
 
     public static void runJob(JobSpecification spec) throws Exception {
@@ -127,6 +147,23 @@ public class AsterixHyracksIntegrationUtil {
         hcc.waitForCompletion(jobId);
     }
 
+    private static void removeTestStorageFiles() throws IOException {
+        File dir = new File(System.getProperty(IO_DIR_KEY));
+        for (String ncName : AsterixHyracksIntegrationUtil.getNcNames()) {
+            File ncDir = new File(dir, ncName);
+            FileUtils.deleteQuietly(ncDir);
+        }
+    }
+
+    private static void deleteTransactionLogs() throws Exception {
+        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
+            File log = new File(txnProperties.getLogDirectory(ncId));
+            if (log.exists()) {
+                FileUtils.deleteDirectory(log);
+            }
+        }
+    }
+
     /**
      * main method to run a simple 2 node cluster in-process
      * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
@@ -136,9 +173,10 @@ public class AsterixHyracksIntegrationUtil {
      */
     public static void main(String[] args) {
         Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
             public void run() {
                 try {
-                    deinit();
+                    deinit(false);
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -147,7 +185,7 @@ public class AsterixHyracksIntegrationUtil {
         try {
             System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml");
 
-            init();
+            init(false);
             while (true) {
                 Thread.sleep(10000);
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
index b213bca..c83ce6a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ConnectorAPIServlet.java
@@ -40,6 +40,7 @@ import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.util.FlushDatasetUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
@@ -108,7 +109,12 @@ public class ConnectorAPIServlet extends HttpServlet {
             pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
 
             // Constructs the returned json object.
-            formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), hcc.getNodeControllerInfos());
+            formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
+                    hcc.getNodeControllerInfos());
+
+            // Flush the cached contents of the dataset to file system.
+            FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseName, datasetName, datasetName);
+
             // Metadata transaction commits.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             // Writes file splits.
@@ -123,8 +129,10 @@ public class ConnectorAPIServlet extends HttpServlet {
     }
 
     private void formResponseObject(JSONObject jsonResponse, FileSplit[] fileSplits, ARecordType recordType,
-            String primaryKeys, Map<String, NodeControllerInfo> nodeMap) throws Exception {
+            String primaryKeys, boolean temp, Map<String, NodeControllerInfo> nodeMap) throws Exception {
         JSONArray partititons = new JSONArray();
+        // Whether the dataset is temp or not
+        jsonResponse.put("temp", temp);
         // Adds a primary key.
         jsonResponse.put("keys", primaryKeys);
         // Adds record type.
@@ -133,7 +141,7 @@ public class ConnectorAPIServlet extends HttpServlet {
         for (FileSplit split : fileSplits) {
             String ipAddress = nodeMap.get(split.getNodeName()).getNetworkAddress().getAddress().toString();
             String path = split.getLocalFile().getFile().getAbsolutePath();
-            FilePartition partition = new FilePartition(ipAddress, path);
+            FilePartition partition = new FilePartition(ipAddress, path, split.getIODeviceId());
             partititons.put(partition.toJSONObject());
         }
         // Generates the response object which contains the splits.
@@ -144,10 +152,12 @@ public class ConnectorAPIServlet extends HttpServlet {
 class FilePartition {
     private final String ipAddress;
     private final String path;
+    private final int ioDeviceId;
 
-    public FilePartition(String ipAddress, String path) {
+    public FilePartition(String ipAddress, String path, int ioDeviceId) {
         this.ipAddress = ipAddress;
         this.path = path;
+        this.ioDeviceId = ioDeviceId;
     }
 
     public String getIPAddress() {
@@ -158,6 +168,10 @@ class FilePartition {
         return path;
     }
 
+    public int getIODeviceId() {
+        return ioDeviceId;
+    }
+
     @Override
     public String toString() {
         return ipAddress + ":" + path;
@@ -167,6 +181,7 @@ class FilePartition {
         JSONObject partition = new JSONObject();
         partition.put("ip", ipAddress);
         partition.put("path", path);
+        partition.put("ioDeviceId", ioDeviceId);
         return partition;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
index d75625f..3852020 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/UpdateAPIServlet.java
@@ -34,17 +34,20 @@ public class UpdateAPIServlet extends RESTAPIServlet {
         super(compilationProvider);
     }
 
+    @Override
     protected String getQueryParameter(HttpServletRequest request) {
         return request.getParameter("statements");
     }
 
+    @Override
     protected List<Statement.Kind> getAllowedStatements() {
         Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.DELETE, Kind.INSERT, Kind.UPDATE, Kind.DML_CMD_LIST,
                 Kind.LOAD, Kind.CONNECT_FEED, Kind.DISCONNECT_FEED, Kind.SET, Kind.COMPACT,
-                Kind.EXTERNAL_DATASET_REFRESH };
+                Kind.EXTERNAL_DATASET_REFRESH, Kind.RUN };
         return Arrays.asList(statementsArray);
     }
 
+    @Override
     protected String getErrorMessage() {
         return "Invalid statement: Non-Update statement %s to the Update API.";
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index fa55a47..51e1612 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -21,9 +21,9 @@ package org.apache.asterix.aql.translator;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -40,10 +40,9 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.APIFramework;
-import org.apache.asterix.api.common.Job;
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
-import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -149,13 +148,11 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import org.apache.asterix.result.ResultReader;
 import org.apache.asterix.result.ResultUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -169,32 +166,25 @@ import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedSta
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
+import org.apache.asterix.util.FlushDatasetUtils;
+import org.apache.asterix.util.JobUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -641,7 +631,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 //#. runJob
-                runJob(hcc, jobSpec, true);
+                JobUtils.runJob(hcc, jobSpec, true);
 
                 //#. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -674,8 +664,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                     JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -954,7 +943,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                                 "Failed to create job spec for replicating Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    runJob(hcc, spec, true);
+                    JobUtils.runJob(hcc, spec, true);
                 }
             }
 
@@ -997,7 +986,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             //#. create the index artifact in NC.
-            runJob(hcc, spec, true);
+            JobUtils.runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
@@ -1011,7 +1000,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            runJob(hcc, spec, true);
+            JobUtils.runJob(hcc, spec, true);
 
             //#. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1050,7 +1039,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                             metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1072,7 +1061,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1270,7 +1259,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
+                JobUtils.runJob(hcc, jobSpec, true);
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1297,7 +1286,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 //   remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
+                        JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
                     //do no throw exception since still the metadata needs to be compensated.
@@ -1391,12 +1380,12 @@ public class QueryTranslator extends AbstractLangTranslator {
 
                 //# disconnect the feeds
                 for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
-                    runJob(hcc, p.first, true);
+                    JobUtils.runJob(hcc, p.first, true);
                 }
 
                 //#. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 }
 
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1434,7 +1423,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
                 //#. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 }
                 if (indexes.size() > 0) {
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
@@ -1463,7 +1452,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 //   remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
+                        JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
                     //do no throw exception since still the metadata needs to be compensated.
@@ -1561,7 +1550,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 }
 
                 //#. begin a new transaction
@@ -1624,7 +1613,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    runJob(hcc, jobSpec, true);
+                    JobUtils.runJob(hcc, jobSpec, true);
                 }
 
                 //#. begin a new transaction
@@ -1654,7 +1643,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 //   remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        runJob(hcc, jobSpec, true);
+                        JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
                     //do no throw exception since still the metadata needs to be compensated.
@@ -1810,7 +1799,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
-                runJob(hcc, spec, true);
+                JobUtils.runJob(hcc, spec, true);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1844,7 +1833,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
 
             if (compiled != null) {
-                runJob(hcc, compiled, true);
+                JobUtils.runJob(hcc, compiled, true);
             }
 
         } catch (Exception e) {
@@ -1880,7 +1869,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
 
             if (compiled != null) {
-                runJob(hcc, compiled, true);
+                JobUtils.runJob(hcc, compiled, true);
             }
 
         } catch (Exception e) {
@@ -2152,7 +2141,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 for (IFeedJoint fj : triple.third) {
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
                 }
-                runJob(hcc, pair.first, false);
+                JobUtils.runJob(hcc, pair.first, false);
                 IFeedAdapterFactory adapterFactory = pair.second;
                 if (adapterFactory.isRecordTrackingEnabled()) {
                     FeedLifecycleListener.INSTANCE.registerFeedIntakeProgressTracker(feedConnId,
@@ -2323,7 +2312,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             JobSpecification jobSpec = specDisconnectType.first;
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            runJob(hcc, jobSpec, true);
+            JobUtils.runJob(hcc, jobSpec, true);
 
             if (!specDisconnectType.second) {
                 CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
@@ -2378,7 +2367,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
 
             if (compiled != null) {
-                runJob(hcc, alteredJobSpec, false);
+                JobUtils.runJob(hcc, alteredJobSpec, false);
             }
 
         } catch (Exception e) {
@@ -2429,7 +2418,6 @@ public class QueryTranslator extends AbstractLangTranslator {
                                 .getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
                         jobsToExecute
                                 .add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
-
                     }
                 }
             } else {
@@ -2458,7 +2446,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             //#. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                runJob(hcc, jobSpec, true);
+                JobUtils.runJob(hcc, jobSpec, true);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2486,7 +2474,7 @@ public class QueryTranslator extends AbstractLangTranslator {
 
             if (sessionConfig.isExecuteQuery() && compiled != null) {
                 GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
-                JobId jobId = runJob(hcc, compiled, false);
+                JobId jobId = JobUtils.runJob(hcc, compiled, false);
 
                 JSONObject response = new JSONObject();
                 switch (resultDelivery) {
@@ -2664,14 +2652,14 @@ public class QueryTranslator extends AbstractLangTranslator {
             transactionState = ExternalDatasetTransactionState.BEGIN;
 
             //run the files update job
-            runJob(hcc, spec, true);
+            JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles,
                             addedFiles, appendedFiles, metadataProvider);
                     //run the files update job
-                    runJob(hcc, spec, true);
+                    JobUtils.runJob(hcc, spec, true);
                 }
             }
 
@@ -2690,7 +2678,7 @@ public class QueryTranslator extends AbstractLangTranslator {
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
             // We don't release the latch since this job is expected to be quick
-            runJob(hcc, spec, true);
+            JobUtils.runJob(hcc, spec, true);
             // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2760,7 +2748,7 @@ public class QueryTranslator extends AbstractLangTranslator {
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 try {
-                    runJob(hcc, spec, true);
+                    JobUtils.runJob(hcc, spec, true);
                 } catch (Exception e2) {
                     // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
@@ -2812,242 +2800,212 @@ public class QueryTranslator extends AbstractLangTranslator {
     }
 
     private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws AsterixException, Exception {
-
+            IHyracksClientConnection hcc) throws Exception {
         RunStatement pregelixStmt = (RunStatement) stmt;
         boolean bActiveTxn = true;
-
         String dataverseNameFrom = getActiveDataverse(pregelixStmt.getDataverseNameFrom());
         String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
         String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
         String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
 
-        if (dataverseNameFrom != dataverseNameTo) {
-            throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
-        }
-
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
-
+        List<String> readDataverses = new ArrayList<String>();
+        readDataverses.add(dataverseNameFrom);
+        List<String> readDatasets = new ArrayList<String>();
+        readDatasets.add(datasetNameFrom);
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseNameTo, datasetNameTo, readDataverses, readDatasets);
         try {
+            prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, dataverseNameFrom, dataverseNameTo,
+                    datasetNameFrom, datasetNameTo, mdTxnCtx);
 
-            // construct input paths
-            Index fromIndex = null;
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom,
-                    pregelixStmt.getDatasetNameFrom().getValue());
-            for (Index ind : indexes) {
-                if (ind.isPrimaryIndex())
-                    fromIndex = ind;
+            String pregelixHomeKey = "PREGELIX_HOME";
+            // Finds PREGELIX_HOME in system environment variables.
+            String pregelixHome = System.getenv(pregelixHomeKey);
+            // Finds PREGELIX_HOME in Java properties.
+            if (pregelixHome == null) {
+                pregelixHome = System.getProperty(pregelixHomeKey);
             }
-
-            if (fromIndex == null) {
-                throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
+            // Finds PREGELIX_HOME in AsterixDB configuration.
+            if (pregelixHome == null) {
+                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties, pregelixHome can never be null.
+                pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
-            Dataset datasetFrom = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameFrom, datasetNameFrom);
-            IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName(),
-                    datasetFrom.getDatasetDetails().isTemp()).first;
-            StringBuilder fromSplitsPaths = new StringBuilder();
+            // Constructs the pregelix command line.
+            List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
+                    dataverseNameTo, datasetNameTo);
+            ProcessBuilder pb = new ProcessBuilder(cmd);
+            pb.directory(new File(pregelixHome));
+            pb.redirectErrorStream(true);
 
-            for (FileSplit f : fromSplits.getFileSplits()) {
-                fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
-                fromSplitsPaths.append(",");
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            // Executes the Pregelix command.
+            int resultState = executeExternalShellProgram(pb);
+            // Checks the return state of the external Pregelix command.
+            if (resultState != 0) {
+                throw new AlgebricksException(
+                        "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
+                                + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+                                + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
             }
-            fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseNameTo, datasetNameTo, readDataverses, readDatasets);
+        }
+    }
+
+    // Prepares to run a program on external runtime.
+    private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
+            String datasetNameTo, MetadataTransactionContext mdTxnCtx)
+                    throws AlgebricksException, AsterixException, Exception {
+        // Validates the source/sink dataverses and datasets.
+        Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
+        if (fromDataset == null) {
+            throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
+                    + " could not be found for the Run command");
+        }
+        Dataset toDataset = metadataProvider.findDataset(dataverseNameTo, datasetNameTo);
+        if (toDataset == null) {
+            throw new AsterixException("The sink dataset " + datasetNameTo + " in dataverse " + dataverseNameTo
+                    + " could not be found for the Run command");
+        }
 
-            // Construct output paths
+        try {
+            // Find the primary index of the sink dataset.
             Index toIndex = null;
-            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
                     pregelixStmt.getDatasetNameTo().getValue());
-            for (Index ind : indexes) {
-                if (ind.isPrimaryIndex())
-                    toIndex = ind;
+            for (Index index : indexes) {
+                if (index.isPrimaryIndex()) {
+                    toIndex = index;
+                    break;
+                }
             }
-
             if (toIndex == null) {
                 throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
             }
+            // Cleans up the sink dataset -- Drop and then Create.
+            DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
+                    true);
+            this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+            IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
+                    toDataset.getDatasetDetails().isTemp());
+            DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+                    pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+                    new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
+                    toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(), idd,
+                    false);
+            this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+        }
 
-            Dataset datasetTo = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
-            IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
-                    dataverseNameTo, datasetNameTo, toIndex.getIndexName(),
-                    datasetTo.getDatasetDetails().isTemp()).first;
-            StringBuilder toSplitsPaths = new StringBuilder();
-
-            for (FileSplit f : toSplits.getFileSplits()) {
-                toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
-                toSplitsPaths.append(",");
-            }
-            toSplitsPaths.setLength(toSplitsPaths.length() - 1);
-
-            try {
-                Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
-                DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
-                        pregelixStmt.getDatasetNameTo(), true);
-                this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-
-                IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), false, null,
-                        toDataset.getDatasetDetails().isTemp());
-                DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
-                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
-                        new Identifier(toDataset.getNodeGroupName()), toDataset.getCompactionPolicy(),
-                        toDataset.getCompactionPolicyProperties(), toDataset.getHints(), toDataset.getDatasetType(),
-                        idd, false);
-                this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
-            }
-
-            // Flush source dataset
-            flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
-
-            // call Pregelix
-            String pregelix_home = System.getenv("PREGELIX_HOME");
-            if (pregelix_home == null) {
-                throw new AlgebricksException("PREGELIX_HOME is not defined!");
-            }
-
-            // construct command
-            ArrayList<String> cmd = new ArrayList<String>();
-            cmd.add("bin/pregelix");
-            cmd.add(pregelixStmt.getParameters().get(0)); // jar
-            cmd.add(pregelixStmt.getParameters().get(1)); // class
-            for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
-                cmd.add(s);
-            }
-            cmd.add("-inputpaths");
-            cmd.add(fromSplitsPaths.toString());
-            cmd.add("-outputpath");
-            cmd.add(toSplitsPaths.toString());
-
-            StringBuilder command = new StringBuilder();
-            for (String s : cmd) {
-                command.append(s);
-                command.append(" ");
-            }
-            LOGGER.info("Running Pregelix Command: " + command.toString());
-
-            ProcessBuilder pb = new ProcessBuilder(cmd);
-            pb.directory(new File(pregelix_home));
-            pb.redirectErrorStream(true);
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-
-            Process pr = pb.start();
-
-            int resultState = 0;
+        // Flushes source dataset.
+        FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
+                datasetNameFrom);
+    }
 
-            BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+    // Executes external shell commands.
+    private int executeExternalShellProgram(ProcessBuilder pb)
+            throws IOException, AlgebricksException, InterruptedException {
+        Process process = pb.start();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
             String line;
             while ((line = in.readLine()) != null) {
-                System.out.println(line);
-                if (line.contains("job finished")) {
-                    resultState = 1;
-                }
+                LOGGER.info(line);
                 if (line.contains("Exception") || line.contains("Error")) {
-
+                    LOGGER.severe(line);
                     if (line.contains("Connection refused")) {
                         throw new AlgebricksException(
                                 "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
                     }
-
                     if (line.contains("Could not find or load main class")) {
                         throw new AlgebricksException(
                                 "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
                     }
-
                     if (line.contains("ClassNotFoundException")) {
                         throw new AlgebricksException(
                                 "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
                     }
-
-                    if (line.contains("HyracksException")) {
-                        throw new AlgebricksException(
-                                "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
-                                        + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
-                                        + "Check also if your datatypes in Pregelix and Asterix are matching.");
-                    }
-
-                    throw new AlgebricksException(
-                            "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
-                                    + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
-                                    + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
                 }
             }
-            pr.waitFor();
-            in.close();
-
-            if (resultState != 1) {
-                throw new AlgebricksException(
-                        "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
-                                + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
-                                + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
-            }
-        } catch (Exception e) {
-            if (bActiveTxn) {
-                abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        } finally {
-            MetadataLockManager.INSTANCE.pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+            process.waitFor();
         }
+        // Gets the exit value of the program.
+        int resultState = process.exitValue();
+        return resultState;
     }
 
-    private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
-            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
-                    throws Exception {
-        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
-        int frameSize = compilerProperties.getFrameSize();
-        JobSpecification spec = new JobSpecification(frameSize);
-
-        RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
-        AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
-                new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
-
-        org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
-        Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
-                dataset.getDatasetId());
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
-
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
-                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
-                        dataset.getDatasetDetails().isTemp());
-        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
-
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
-                primaryPartitionConstraint);
-
-        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
-        spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        runJob(hcc, spec, true);
-    }
-
-    private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
-            throws Exception {
-        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, sessionConfig.out(), waitForCompletion);
-        return jobIds[0];
-    }
-
-    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
-            throws Exception {
-        JobId[] startedJobIds = new JobId[jobs.length];
-        for (int i = 0; i < jobs.length; i++) {
-            JobSpecification spec = jobs[i].getJobSpec();
-            spec.setMaxReattempts(0);
-            JobId jobId = hcc.startJob(spec);
-            startedJobIds[i] = jobId;
-            if (waitForCompletion) {
-                hcc.waitForCompletion(jobId);
+    // Constructs a Pregelix command line.
+    private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
+            String fromDatasetName, String toDataverseName, String toDatasetName) {
+        // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+        AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+        AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
+        String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
+        StringBuilder asterixdbParameterBuilder = new StringBuilder();
+        asterixdbParameterBuilder.append(
+                "pregelix.asterixdb.url=" + "http://" + clientIP + ":" + externalProperties.getAPIServerPort() + ",");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.source=true,");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.sink=true,");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataverse=" + fromDataverseName + ",");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataset=" + fromDatasetName + ",");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataverse=" + toDataverseName + ",");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
+        asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
+
+        // construct command
+        List<String> cmds = new ArrayList<String>();
+        cmds.add("bin/pregelix");
+        cmds.add(pregelixStmt.getParameters().get(0)); // jar
+        cmds.add(pregelixStmt.getParameters().get(1)); // class
+
+        String customizedPregelixProperty = "-cust-prop";
+        String inputConverterClassKey = "pregelix.asterixdb.input.converterclass";
+        String inputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdInputVertexConverter,";
+        String outputConverterClassKey = "pregelix.asterixdb.output.converterclass";
+        String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
+        boolean custPropAdded = false;
+        boolean meetCustProp = false;
+        // User parameters.
+        for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+            if (meetCustProp) {
+                if (!s.contains(inputConverterClassKey)) {
+                    asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
+                }
+                if (!s.contains(outputConverterClassKey)) {
+                    asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
+                }
+                cmds.add(asterixdbParameterBuilder.toString() + s);
+                meetCustProp = false;
+                custPropAdded = true;
+                continue;
+            }
+            cmds.add(s);
+            if (s.equals(customizedPregelixProperty)) {
+                meetCustProp = true;
             }
         }
-        return startedJobIds;
+
+        if (!custPropAdded) {
+            cmds.add(customizedPregelixProperty);
+            // Appends default converter classes to asterixdbParameterBuilder.
+            asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
+            asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
+            // Remove the last comma.
+            asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
+                    asterixdbParameterBuilder.length());
+            cmds.add(asterixdbParameterBuilder.toString());
+        }
+        return cmds;
     }
 
     private String getActiveDataverseName(String dataverse) throws AlgebricksException {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
index b4fa1d3..823cc4e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ b/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
@@ -80,11 +80,11 @@ public class AsterixCLI {
         File lsn = new File("last_checkpoint_lsn");
         lsn.deleteOnExit();
 
-        AsterixHyracksIntegrationUtil.init();
+        AsterixHyracksIntegrationUtil.init(false);
     }
 
     public static void tearDown() throws Exception {
-        AsterixHyracksIntegrationUtil.deinit();
+        AsterixHyracksIntegrationUtil.deinit(false);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index b7c210c..013e021 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -117,7 +117,6 @@ public class DatasetOperations {
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
 
-        // The index drop operation should be persistent regardless of temp datasets or permanent dataset
         IndexDropOperatorDescriptor primaryBtreeDrop = new IndexDropOperatorDescriptor(specPrimary,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
@@ -125,7 +124,7 @@ public class DatasetOperations {
                         new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
-                        filterCmpFactories, btreeFields, filterFields, true));
+                        filterCmpFactories, btreeFields, filterFields, !temp));
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
 
@@ -180,7 +179,6 @@ public class DatasetOperations {
         ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                 localResourceMetadata, LocalResource.LSMBTreeResource);
 
-        // The index create operation should be persistent regardless of temp datasets or permanent dataset
         TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
@@ -189,7 +187,7 @@ public class DatasetOperations {
                                 .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                         LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                 .getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
-                        btreeFields, filterFields, true), localResourceFactoryProvider,
+                        btreeFields, filterFields, !temp), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index ebac99f..c5870a6 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -113,7 +113,7 @@ public class IndexOperations {
                         dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
                         new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, true));
+                        storageProperties.getBloomFilterFalsePositiveRate(), false, null, null, null, null, !temp));
         AlgebricksPartitionConstraintHelper
                 .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
         spec.addRoot(btreeDrop);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index 34517ca..7c14e5d 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -96,13 +96,12 @@ public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelp
                     secondaryBTreeFields, secondaryFilterFields);
             localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(localResourceMetadata,
                     LocalResource.LSMBTreeResource);
-            // The index create operation should be persistent regardless of temp datasets or permanent dataset.
             indexDataflowHelperFactory = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                     dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
                     new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
-                    secondaryBTreeFields, secondaryFilterFields, true);
+                    secondaryBTreeFields, secondaryFilterFields, !dataset.getDatasetDetails().isTemp());
         } else {
             // External dataset local resource and dataflow helper
             int[] buddyBreeFields = new int[] { numSecondaryKeys };
@@ -197,7 +196,7 @@ public class SecondaryBTreeOperationsHelper extends SecondaryIndexOperationsHelp
             spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
             return spec;
         } else {
-            // Create dummy key provider for feeding the primary index scan. 
+            // Create dummy key provider for feeding the primary index scan.
             AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
 
             // Create primary index scan op.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
new file mode 100644
index 0000000..7536c70
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class FlushDatasetUtils {
+
+    public static void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+                    throws Exception {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        int frameSize = compilerProperties.getFrameSize();
+        JobSpecification spec = new JobSpecification(frameSize);
+
+        RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+        AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+                new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+        org.apache.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+                dataset.getDatasetId());
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName,
+                        dataset.getDatasetDetails().isTemp());
+        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+                primaryPartitionConstraint);
+
+        JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, true);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        JobUtils.runJob(hcc, spec, true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java b/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
new file mode 100644
index 0000000..fb50b0c
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/util/JobUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.util;
+
+import org.apache.asterix.api.common.Job;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class JobUtils {
+
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
+            throws Exception {
+        JobId[] jobIds = executeJobArray(hcc, new Job[] { new Job(spec) }, waitForCompletion);
+        return jobIds[0];
+    }
+
+    public static JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, boolean waitForCompletion)
+            throws Exception {
+        JobId[] startedJobIds = new JobId[jobs.length];
+        for (int i = 0; i < jobs.length; i++) {
+            JobSpecification spec = jobs[i].getJobSpec();
+            spec.setMaxReattempts(0);
+            JobId jobId = hcc.startJob(spec);
+            startedJobIds[i] = jobId;
+            if (waitForCompletion) {
+                hcc.waitForCompletion(jobId);
+            }
+        }
+        return startedJobIds;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index f8f33d6..8d0b7f3 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -66,6 +66,10 @@
 		<name>compiler.joinmemory</name>
 		<value>163840</value>
 	</property>
+        <property>
+               <name>compiler.pregelix.home</name>
+               <value>~/pregelix</value>
+        </property>
 	<property>
 		<name>storage.buffercache.pagesize</name>
 		<value>32768</value>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
index 6da127c..80f5f5a 100644
--- a/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorAPIServletTest.java
@@ -103,7 +103,9 @@ public class ConnectorAPIServletTest {
                 new ByteArrayInputStream(outputStream.toByteArray())));
         JSONObject actualResponse = new JSONObject(tokener);
 
-        // Checks the data type of the dataset.
+        // Checks the temp-or-not, primary key, data type of the dataset.
+        boolean temp = actualResponse.getBoolean("temp");
+        Assert.assertFalse(temp);
         String primaryKey = actualResponse.getString("keys");
         Assert.assertEquals("DataverseName,DatasetName", primaryKey);
         ARecordType recordType = (ARecordType) JSONDeserializerForTypes.convertFromJSON((JSONObject) actualResponse
@@ -144,20 +146,23 @@ public class ConnectorAPIServletTest {
         nodeMap.put("nc2", mockInfo2);
         PA.invokeMethod(servlet,
                 "formResponseObject(org.json.JSONObject, org.apache.hyracks.dataflow.std.file.FileSplit[], "
-                        + "org.apache.asterix.om.types.ARecordType, java.lang.String, java.util.Map)", actualResponse,
-                splits, recordType, primaryKey, nodeMap);
+                        + "org.apache.asterix.om.types.ARecordType, java.lang.String, boolean, java.util.Map)",
+                actualResponse, splits, recordType, primaryKey, true, nodeMap);
 
         // Constructs expected response.
         JSONObject expectedResponse = new JSONObject();
+        expectedResponse.put("temp", true);
         expectedResponse.put("keys", primaryKey);
         expectedResponse.put("type", recordType.toJSON());
         JSONArray splitsArray = new JSONArray();
         JSONObject element1 = new JSONObject();
         element1.put("ip", "127.0.0.1");
         element1.put("path", splits[0].getLocalFile().getFile().getAbsolutePath());
+        element1.put("ioDeviceId", 0);
         JSONObject element2 = new JSONObject();
         element2.put("ip", "127.0.0.2");
         element2.put("path", splits[1].getLocalFile().getFile().getAbsolutePath());
+        element2.put("ioDeviceId", 0);
         splitsArray.put(element1);
         splitsArray.put(element2);
         expectedResponse.put("splits", splitsArray);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
new file mode 100644
index 0000000..89350aa
--- /dev/null
+++ b/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.aql.translator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.api.common.SessionConfig;
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.compiler.provider.AqlCompilationProvider;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.MasterNode;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.RunStatement;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.junit.Test;
+
+import junit.extensions.PA;
+import junit.framework.Assert;
+
+@SuppressWarnings({ "unchecked", "deprecation" })
+public class QueryTranslatorTest {
+
+    @Test
+    public void test() throws Exception {
+        List<Statement> statements = new ArrayList<Statement>();
+        SessionConfig mockSessionConfig = mock(SessionConfig.class);
+        RunStatement mockRunStatement = mock(RunStatement.class);
+
+        // Mocks AsterixAppContextInfo.
+        AsterixAppContextInfo mockAsterixAppContextInfo = mock(AsterixAppContextInfo.class);
+        setFinalStaticField(AsterixAppContextInfo.class.getDeclaredField("INSTANCE"), mockAsterixAppContextInfo);
+        AsterixExternalProperties mockAsterixExternalProperties = mock(AsterixExternalProperties.class);
+        when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
+        when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+
+        // Mocks AsterixClusterProperties.
+        Cluster mockCluster = mock(Cluster.class);
+        MasterNode mockMasterNode = mock(MasterNode.class);
+        AsterixClusterProperties mockClusterProperties = mock(AsterixClusterProperties.class);
+        setFinalStaticField(AsterixClusterProperties.class.getDeclaredField("INSTANCE"), mockClusterProperties);
+        when(mockClusterProperties.getCluster()).thenReturn(mockCluster);
+        when(mockCluster.getMasterNode()).thenReturn(mockMasterNode);
+        when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
+
+        QueryTranslator aqlTranslator = new QueryTranslator(statements, mockSessionConfig,
+                new AqlCompilationProvider());
+        List<String> parameters = new ArrayList<String>();
+        parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
+        parameters.add("org.apache.pregelix.example.PageRankVertex");
+        parameters.add("-ip 10.0.2.15 -port 3199");
+        when(mockRunStatement.getParameters()).thenReturn(parameters);
+        // Test a customer command without "-cust-prop".
+        List<String> cmds = (List<String>) PA.invokeMethod(aqlTranslator,
+                "constructPregelixCommand(org.apache.asterix.lang.common.statement.RunStatement,"
+                        + "String,String,String,String)",
+                mockRunStatement, "fromDataverse", "fromDataset", "toDataverse", "toDataset");
+        List<String> expectedCmds = Arrays.asList(new String[] { "bin/pregelix",
+                "examples/pregelix-example-jar-with-dependencies.jar", "org.apache.pregelix.example.PageRankVertex",
+                "-ip", "10.0.2.15", "-port", "3199", "-cust-prop",
+                "pregelix.asterixdb.url=http://127.0.0.1:19002,pregelix.asterixdb.source=true,pregelix.asterixdb.sink=true,pregelix.asterixdb.input.dataverse=fromDataverse,pregelix.asterixdb.input.dataset=fromDataset,pregelix.asterixdb.output.dataverse=toDataverse,pregelix.asterixdb.output.dataset=toDataset,pregelix.asterixdb.output.cleanup=false,pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.VLongIdInputVertexConverter,pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter" });
+        Assert.assertEquals(cmds, expectedCmds);
+
+        parameters.remove(parameters.size() - 1);
+        parameters.add("-ip 10.0.2.15 -port 3199 -cust-prop "
+                + "pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.TestInputVertexConverter,"
+                + "pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.TestOutputVertexConverter");
+        // Test a customer command with "-cust-prop".
+        cmds = (List<String>) PA.invokeMethod(aqlTranslator,
+                "constructPregelixCommand(org.apache.asterix.lang.common.statement.RunStatement,"
+                        + "String,String,String,String)",
+                mockRunStatement, "fromDataverse", "fromDataset", "toDataverse", "toDataset");
+        expectedCmds = Arrays.asList(new String[] { "bin/pregelix",
+                "examples/pregelix-example-jar-with-dependencies.jar", "org.apache.pregelix.example.PageRankVertex",
+                "-ip", "10.0.2.15", "-port", "3199", "-cust-prop",
+                "pregelix.asterixdb.url=http://127.0.0.1:19002,pregelix.asterixdb.source=true,pregelix.asterixdb.sink=true,pregelix.asterixdb.input.dataverse=fromDataverse,pregelix.asterixdb.input.dataset=fromDataset,pregelix.asterixdb.output.dataverse=toDataverse,pregelix.asterixdb.output.dataset=toDataset,pregelix.asterixdb.output.cleanup=false,pregelix.asterixdb.input.converterclass=org.apache.pregelix.example.converter.TestInputVertexConverter,pregelix.asterixdb.output.converterclass=org.apache.pregelix.example.converter.TestOutputVertexConverter" });
+        Assert.assertEquals(cmds, expectedCmds);
+    }
+
+    private void setFinalStaticField(Field field, Object newValue) throws Exception {
+        field.setAccessible(true);
+        // remove final modifier from field
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        field.set(null, newValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java b/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
index dff1c18..0a22f1c 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/dml/DmlTest.java
@@ -54,7 +54,7 @@ public class DmlTest {
         }
         outdir.mkdirs();
 
-        AsterixHyracksIntegrationUtil.init();
+        AsterixHyracksIntegrationUtil.init(true);
         Reader loadReader = new BufferedReader(
                 new InputStreamReader(new FileInputStream(LOAD_FOR_ENLIST_FILE), "UTF-8"));
         AsterixJavaClient asterixLoad = new AsterixJavaClient(
@@ -69,7 +69,7 @@ public class DmlTest {
         }
         asterixLoad.execute();
 
-        AsterixHyracksIntegrationUtil.deinit();
+        AsterixHyracksIntegrationUtil.deinit(true);
         for (String d : ASTERIX_DATA_DIRS) {
             testExecutor.deleteRec(new File(d));
         }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
index 3788606..376b2ff 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTest.java
@@ -23,12 +23,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -50,7 +47,6 @@ public class MetadataTest {
             .join(new String[] { "src", "test", "resources", "metadata" + File.separator }, File.separator);
     private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
 
-    private static AsterixTransactionProperties txnProperties;
     private static final TestExecutor testExecutor = new TestExecutor();
 
     @BeforeClass
@@ -58,18 +54,12 @@ public class MetadataTest {
         System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
-
-        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
-        txnProperties = new AsterixTransactionProperties(apa);
-
-        deleteTransactionLogs();
-
-        AsterixHyracksIntegrationUtil.init();
+        AsterixHyracksIntegrationUtil.init(true);
     }
 
     @AfterClass
     public static void tearDown() throws Exception {
-        AsterixHyracksIntegrationUtil.deinit();
+        AsterixHyracksIntegrationUtil.deinit(true);
         File outdir = new File(PATH_ACTUAL);
         File[] files = outdir.listFiles();
         if (files == null || files.length == 0) {
@@ -82,15 +72,6 @@ public class MetadataTest {
         }
     }
 
-    private static void deleteTransactionLogs() throws Exception {
-        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
-            File log = new File(txnProperties.getLogDirectory(ncId));
-            if (log.exists()) {
-                FileUtils.deleteDirectory(log);
-            }
-        }
-    }
-
     @Parameters
     public static Collection<Object[]> tests() throws Exception {
         Collection<Object[]> testArgs = new ArrayList<Object[]>();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/751315f8/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 092f898..2f8a910 100644
--- a/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,8 +30,6 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
-import org.apache.asterix.common.config.AsterixPropertiesAccessor;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.compiler.provider.AqlCompilationProvider;
@@ -40,7 +38,6 @@ import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
-import org.apache.commons.io.FileUtils;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -71,8 +68,6 @@ public class OptimizerTest {
     private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
-    private static AsterixTransactionProperties txnProperties;
-
     @BeforeClass
     public static void setUp() throws Exception {
         // File outdir = new File(PATH_ACTUAL);
@@ -82,27 +77,13 @@ public class OptimizerTest {
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
 
-        AsterixPropertiesAccessor apa = new AsterixPropertiesAccessor();
-        txnProperties = new AsterixTransactionProperties(apa);
-
-        deleteTransactionLogs();
-
-        AsterixHyracksIntegrationUtil.init();
+        AsterixHyracksIntegrationUtil.init(true);
         // Set the node resolver to be the identity resolver that expects node names
         // to be node controller ids; a valid assumption in test environment.
         System.setProperty(FileSystemBasedAdapter.NODE_RESOLVER_FACTORY_PROPERTY,
                 IdentitiyResolverFactory.class.getName());
     }
 
-    private static void deleteTransactionLogs() throws Exception {
-        for (String ncId : AsterixHyracksIntegrationUtil.getNcNames()) {
-            File log = new File(txnProperties.getLogDirectory(ncId));
-            if (log.exists()) {
-                FileUtils.deleteDirectory(log);
-            }
-        }
-    }
-
     @AfterClass
     public static void tearDown() throws Exception {
         // _bootstrap.stop();
@@ -111,7 +92,7 @@ public class OptimizerTest {
         if (files == null || files.length == 0) {
             outdir.delete();
         }
-        AsterixHyracksIntegrationUtil.deinit();
+        AsterixHyracksIntegrationUtil.deinit(true);
     }
 
     private static void suiteBuild(File dir, Collection<Object[]> testArgs, String path) {


Mime
View raw message