asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ti...@apache.org
Subject [2/4] asterixdb git commit: Fix async result delivery
Date Fri, 10 Feb 2017 17:38:55 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index ff30df3..b5fbcf5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -32,12 +32,15 @@ import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.Inet4Address;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -72,6 +75,7 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.http.util.EntityUtils;
 import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -91,12 +95,21 @@ public class TestExecutor {
     private static final long MAX_URL_LENGTH = 2000l;
     private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/",
             Pattern.MULTILINE | Pattern.DOTALL);
+    private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("//.*$", Pattern.MULTILINE);
+    private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("#.*$", Pattern.MULTILINE);
     private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$");
     private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
             Pattern.MULTILINE);
     private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
+    private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)");
+    private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)");
+
     public static final int TRUNCATE_THRESHOLD = 16384;
 
+    public static final String DELIVERY_ASYNC = "async";
+    public static final String DELIVERY_DEFERRED = "deferred";
+    public static final String DELIVERY_IMMEDIATE = "immediate";
+
     private static Method managixExecuteMethod = null;
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
 
@@ -376,8 +389,13 @@ public class TestExecutor {
 
     // For tests where you simply want the byte-for-byte output.
     private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception {
-        if (!actualFile.getParentFile().mkdirs()) {
-            LOGGER.warning("Unable to create actual file parent dir: " + actualFile.getParentFile());
+        final File parentDir = actualFile.getParentFile();
+        if (!parentDir.isDirectory()) {
+            if (parentDir.exists()) {
+                LOGGER.warning("Actual file parent \"" + parentDir + "\" exists but is not a directory");
+            } else if (!parentDir.mkdirs()) {
+                LOGGER.warning("Unable to create actual file parent dir: " + parentDir);
+            }
         }
         try (FileOutputStream out = new FileOutputStream(actualFile)) {
             IOUtils.copy(resultStream, out);
@@ -424,38 +442,30 @@ public class TestExecutor {
         return httpResponse;
     }
 
-    public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params)
+    public InputStream executeQuery(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params)
             throws Exception {
-        HttpUriRequest method = constructHttpMethod(str, url, "query", false, params);
+        HttpUriRequest method = constructHttpMethod(str, uri, "query", false, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         HttpResponse response = executeAndCheckHttpRequest(method);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeQueryService(String str, String url) throws Exception {
-        return executeQueryService(str, OutputFormat.CLEAN_JSON, url, new ArrayList<>(), false);
+    public InputStream executeQueryService(String str, URI uri) throws Exception {
+        return executeQueryService(str, OutputFormat.CLEAN_JSON, uri, new ArrayList<>(), false);
     }
 
-    public InputStream executeQueryService(String str, OutputFormat fmt, String url,
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI uri,
             List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception {
         setParam(params, "format", fmt.mimeType());
-        HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params)
-                : constructPostMethodUrl(str, url, "statement", params);
+        HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", params)
+                : constructPostMethodUrl(str, uri, "statement", params);
         // Set accepted output response type
         method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType());
         HttpResponse response = executeHttpRequest(method);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeQueryService(String statement, OutputFormat fmt, String url,
-            List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception {
-        setParam(params, "mode", deferred);
-        InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded);
-        String handle = ResultExtractor.extractHandle(resultStream);
-        return getHandleResult(handle, fmt);
-    }
-
     protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) {
         for (CompilationUnit.Parameter param : params) {
             if (name.equals(param.getName())) {
@@ -479,19 +489,19 @@ public class TestExecutor {
         return params;
     }
 
-    private HttpUriRequest constructHttpMethod(String statement, String endpoint, String stmtParam,
-            boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) {
-        if (statement.length() + endpoint.length() < MAX_URL_LENGTH) {
+    private HttpUriRequest constructHttpMethod(String statement, URI uri, String stmtParam,
+            boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) throws URISyntaxException {
+        if (statement.length() + uri.toString().length() < MAX_URL_LENGTH) {
             // Use GET for small-ish queries
-            return constructGetMethod(endpoint, injectStatement(statement, stmtParam, otherParams));
+            return constructGetMethod(uri, injectStatement(statement, stmtParam, otherParams));
         } else {
             // Use POST for bigger ones to avoid 413 FULL_HEAD
             String stmtParamName = (postStmtAsParam ? stmtParam : null);
-            return constructPostMethodUrl(statement, endpoint, stmtParamName, otherParams);
+            return constructPostMethodUrl(statement, uri, stmtParamName, otherParams);
         }
     }
 
-    private HttpUriRequest constructGetMethod(String endpoint, List<CompilationUnit.Parameter> params) {
+    private HttpUriRequest constructGetMethod(URI endpoint, List<CompilationUnit.Parameter> params) {
         RequestBuilder builder = RequestBuilder.get(endpoint);
         for (CompilationUnit.Parameter param : params) {
             builder.addParameter(param.getName(), param.getValue());
@@ -500,17 +510,16 @@ public class TestExecutor {
         return builder.build();
     }
 
-    private HttpUriRequest constructGetMethod(String endpoint, OutputFormat fmt,
+    private HttpUriRequest constructGetMethod(URI endpoint, OutputFormat fmt,
             List<CompilationUnit.Parameter> params) {
-
         HttpUriRequest method = constructGetMethod(endpoint, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         return method;
     }
 
-    private HttpUriRequest constructPostMethod(String endpoint, List<CompilationUnit.Parameter> params) {
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+    private HttpUriRequest constructPostMethod(URI uri, List<CompilationUnit.Parameter> params) {
+        RequestBuilder builder = RequestBuilder.post(uri);
         for (CompilationUnit.Parameter param : params) {
             builder.addParameter(param.getName(), param.getValue());
         }
@@ -518,18 +527,17 @@ public class TestExecutor {
         return builder.build();
     }
 
-    private HttpUriRequest constructPostMethod(String endpoint, OutputFormat fmt,
+    private HttpUriRequest constructPostMethod(URI uri, OutputFormat fmt,
             List<CompilationUnit.Parameter> params) {
-
-        HttpUriRequest method = constructPostMethod(endpoint, params);
+        HttpUriRequest method = constructPostMethod(uri, params);
         // Set accepted output response type
         method.setHeader("Accept", fmt.mimeType());
         return method;
     }
 
-    protected HttpUriRequest constructPostMethodUrl(String statement, String endpoint, String stmtParam,
+    protected HttpUriRequest constructPostMethodUrl(String statement, URI uri, String stmtParam,
             List<CompilationUnit.Parameter> otherParams) {
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+        RequestBuilder builder = RequestBuilder.post(uri);
         if (stmtParam != null) {
             for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
                 builder.addParameter(param.getName(), param.getValue());
@@ -543,12 +551,12 @@ public class TestExecutor {
         return builder.build();
     }
 
-    protected HttpUriRequest constructPostMethodJson(String statement, String endpoint, String stmtParam,
+    protected HttpUriRequest constructPostMethodJson(String statement, URI uri, String stmtParam,
             List<CompilationUnit.Parameter> otherParams) {
         if (stmtParam == null) {
             throw new NullPointerException("Statement parameter required.");
         }
-        RequestBuilder builder = RequestBuilder.post(endpoint);
+        RequestBuilder builder = RequestBuilder.post(uri);
         ObjectMapper om = new ObjectMapper();
         ObjectNode content = om.createObjectNode();
         for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) {
@@ -563,23 +571,23 @@ public class TestExecutor {
         return builder.build();
     }
 
-    public InputStream executeJSONGet(OutputFormat fmt, String url) throws Exception {
-        HttpUriRequest request = constructGetMethod(url, fmt, new ArrayList<>());
+    public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception {
+        HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>());
         HttpResponse response = executeAndCheckHttpRequest(request);
         return response.getEntity().getContent();
     }
 
-    public InputStream executeJSONPost(OutputFormat fmt, String url) throws Exception {
-        HttpUriRequest request = constructPostMethod(url, fmt, new ArrayList<>());
+    public InputStream executeJSONPost(OutputFormat fmt, URI uri) throws Exception {
+        HttpUriRequest request = constructPostMethod(uri, fmt, new ArrayList<>());
         HttpResponse response = executeAndCheckHttpRequest(request);
         return response.getEntity().getContent();
     }
 
     // To execute Update statements
     // Insert and Delete statements are executed here
-    public void executeUpdate(String str, String url) throws Exception {
+    public void executeUpdate(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
                 .build();
 
         // Execute the method.
@@ -587,30 +595,25 @@ public class TestExecutor {
     }
 
     // Executes AQL in either async or async-defer mode.
-    public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception {
+    public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri,
+            Map<String, Object> variableCtx) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url)
+        HttpUriRequest request = RequestBuilder.post(uri)
                 .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
-                .setEntity(new StringEntity(str, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()).build();
+                .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
+                .build();
+
+        String handleVar = getHandleVariable(statement);
 
         HttpResponse response = executeAndCheckHttpRequest(request);
         InputStream resultStream = response.getEntity().getContent();
+        String handle = IOUtils.toString(resultStream, "UTF-8");
 
-        String theHandle = IOUtils.toString(resultStream, "UTF-8");
-
-        // take the handle and parse it so results can be retrieved
-        return getHandleResult(theHandle, fmt);
-    }
-
-    private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = getEndpoint(Lets.QUERY_RESULT);
-
-        // Create a method instance.
-        HttpUriRequest request = RequestBuilder.get(url).addParameter("handle", handle)
-                .setHeader("Accept", fmt.mimeType()).build();
-
-        HttpResponse response = executeAndCheckHttpRequest(request);
-        return response.getEntity().getContent();
+        if (handleVar != null) {
+            variableCtx.put(handleVar, handle);
+            return resultStream;
+        }
+        return null;
     }
 
     // To execute DDL and Update statements
@@ -619,9 +622,9 @@ public class TestExecutor {
     // create index statement
     // create dataverse statement
     // create function statement
-    public void executeDDL(String str, String url) throws Exception {
+    public void executeDDL(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
+        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
                 .build();
 
         // Execute the method.
@@ -735,9 +738,10 @@ public class TestExecutor {
         executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
     }
 
-    public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
-            boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
-            List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
+    public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+            String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit,
+            MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
+            throws Exception {
         File qbcFile;
         boolean failed = false;
         File expectedResultFile;
@@ -762,17 +766,11 @@ public class TestExecutor {
                     ResultExtractor.extract(resultStream);
                 }
                 break;
+            case "pollget":
             case "pollquery":
                 // polltimeoutsecs=nnn, polldelaysecs=nnn
-                final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
-                int timeoutSecs;
-                if (timeoutMatcher.find()) {
-                    timeoutSecs = Integer.parseInt(timeoutMatcher.group(1));
-                } else {
-                    throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
-                }
-                final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
-                int retryDelaySecs = retryDelayMatcher.find() ? Integer.parseInt(timeoutMatcher.group(1)) : 1;
+                int timeoutSecs = getTimeoutSecs(statement);
+                int retryDelaySecs = getRetryDelaySecs(statement);
                 long startTime = System.currentTimeMillis();
                 long limitTime = startTime + TimeUnit.SECONDS.toMillis(timeoutSecs);
                 ctx.setType(ctx.getType().substring("poll".length()));
@@ -780,7 +778,7 @@ public class TestExecutor {
                 LOGGER.fine("polling for up to " + timeoutSecs + " seconds w/ " + retryDelaySecs + " second(s) delay");
                 while (true) {
                     try {
-                        executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                        executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                                 expectedResultFileCtxs, testFile, actualPath);
                         finalException = null;
                         break;
@@ -800,7 +798,7 @@ public class TestExecutor {
                 break;
             case "query":
             case "async":
-            case "asyncdefer":
+            case "deferred":
                 // isDmlRecoveryTest: insert Crash and Recovery
                 if (isDmlRecoveryTest) {
                     executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -810,31 +808,43 @@ public class TestExecutor {
                 }
                 InputStream resultStream = null;
                 OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
+                final String reqType = ctx.getType();
+                final List<CompilationUnit.Parameter> params = cUnit.getParameter();
                 if (ctx.getFile().getName().endsWith("aql")) {
-                    if (ctx.getType().equalsIgnoreCase("query")) {
-                        resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY),
-                                cUnit.getParameter());
-                    } else if (ctx.getType().equalsIgnoreCase("async")) {
-                        resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Lets.AQL));
-                    } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
-                        resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Lets.AQL));
+                    if (reqType.equalsIgnoreCase("query")) {
+                        resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY), params);
+                    } else {
+                        final URI endpoint = getEndpoint(Lets.AQL);
+                        if (reqType.equalsIgnoreCase("async")) {
+                            resultStream = executeAnyAQLAsync(statement, false, fmt, endpoint, variableCtx);
+                        } else if (reqType.equalsIgnoreCase("deferred")) {
+                            resultStream = executeAnyAQLAsync(statement, true, fmt, endpoint, variableCtx);
+                        }
+                        Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), resultStream);
                     }
                 } else {
-                    final String reqType = ctx.getType();
-                    final String url = getEndpoint(Lets.QUERY_SERVICE);
-                    final List<CompilationUnit.Parameter> params = cUnit.getParameter();
-                    if (reqType.equalsIgnoreCase("query")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true);
+                    String delivery = DELIVERY_IMMEDIATE;
+                    if (reqType.equalsIgnoreCase("async")) {
+                        delivery = DELIVERY_ASYNC;
+                    } else if (reqType.equalsIgnoreCase("deferred")) {
+                        delivery = DELIVERY_DEFERRED;
+                    }
+                    final URI uri = getEndpoint(Lets.QUERY_SERVICE);
+                    if (DELIVERY_IMMEDIATE.equals(delivery)) {
+                        resultStream = executeQueryService(statement, fmt, uri, params, true);
                         resultStream = ResultExtractor.extract(resultStream);
-                    } else if (reqType.equalsIgnoreCase("async")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true, "async");
-                    } else if (reqType.equalsIgnoreCase("asyncdefer")) {
-                        resultStream = executeQueryService(statement, fmt, url, params, true, "deferred");
+                    } else {
+                        String handleVar = getHandleVariable(statement);
+                        setParam(params, "mode", delivery);
+                        resultStream = executeQueryService(statement, fmt, uri, params, true);
+                        String handle = ResultExtractor.extractHandle(resultStream);
+                        Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), handleVar);
+                        variableCtx.put(handleVar, handle);
                     }
                 }
                 if (queryCount.intValue() >= expectedResultFileCtxs.size()) {
-                    throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: "
-                            + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size());
+                    Assert.fail("no result file for " + testFile.toString() + "; queryCount: " + queryCount
+                            + ", filectxs.size: " + expectedResultFileCtxs.size());
                 }
                 expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
 
@@ -938,17 +948,9 @@ public class TestExecutor {
                             "Unexpected format for method " + ctx.getType() + ": " + ctx.extension());
                 }
                 fmt = OutputFormat.forCompilationUnit(cUnit);
-                String endpoint = stripJavaComments(statement).trim();
-                switch (ctx.getType()) {
-                    case "get":
-                        resultStream = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
-                        break;
-                    case "post":
-                        resultStream = executeJSONPost(fmt, "http://" + host + ":" + port + endpoint);
-                        break;
-                    default:
-                        throw new IllegalStateException("NYI: " + ctx.getType());
-                }
+                final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim();
+                final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx);
+                resultStream = executeHttp(ctx.getType(), variablesReplaced, fmt);
                 expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile();
                 actualResultFile = testCaseCtx.getActualResultFile(cUnit, expectedResultFile, new File(actualPath));
                 writeOutputToFile(actualResultFile, resultStream);
@@ -1047,11 +1049,56 @@ public class TestExecutor {
         }
     }
 
+    protected int getTimeoutSecs(String statement) {
+        final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement);
+        if (timeoutMatcher.find()) {
+            return Integer.parseInt(timeoutMatcher.group(1));
+        } else {
+            throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file");
+        }
+    }
+
+    protected static int getRetryDelaySecs(String statement) {
+        final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement);
+        return retryDelayMatcher.find() ? Integer.parseInt(retryDelayMatcher.group(1)) : 1;
+    }
+
+    protected static String getHandleVariable(String statement) {
+        final Matcher handleVariableMatcher = HANDLE_VARIABLE_PATTERN.matcher(statement);
+        return handleVariableMatcher.find() ? handleVariableMatcher.group(1) : null;
+    }
+
+    protected static String replaceVarRef(String statement, Map<String, Object> variableCtx) {
+        String tmpStmt = statement;
+        Matcher variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+        while (variableReferenceMatcher.find()) {
+            String var = variableReferenceMatcher.group(1);
+            Object value = variableCtx.get(var);
+            Assert.assertNotNull("No value for variable reference $" + var, value);
+            tmpStmt = tmpStmt.replace("$" + var, String.valueOf(value));
+            variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt);
+        }
+        return tmpStmt;
+    }
+
+    protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception {
+        String[] split = endpoint.split("\\?");
+        URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null);
+        switch (ctxType) {
+            case "get":
+                return executeJSONGet(fmt, uri);
+            case "post":
+                return executeJSONPost(fmt, uri);
+            default:
+                throw new AssertionError("Not implemented: " + ctxType);
+        }
+    }
+
     private void killNC(String nodeId, CompilationUnit cUnit) throws Exception {
         //get node process id
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint);
+        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1065,10 +1112,6 @@ public class TestExecutor {
 
     public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
             boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception {
-        File testFile;
-        String statement;
-        List<TestFileContext> expectedResultFileCtxs;
-        List<TestFileContext> testFileCtxs;
         MutableInt queryCount = new MutableInt(0);
         int numOfErrors = 0;
         int numOfFiles = 0;
@@ -1076,14 +1119,15 @@ public class TestExecutor {
         for (CompilationUnit cUnit : cUnits) {
             LOGGER.info(
                     "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... ");
-            testFileCtxs = testCaseCtx.getTestFiles(cUnit);
-            expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
+            Map<String, Object> variableCtx = new HashMap<>();
+            List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit);
+            List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
             for (TestFileContext ctx : testFileCtxs) {
                 numOfFiles++;
-                testFile = ctx.getFile();
-                statement = readTestFile(testFile);
+                final File testFile = ctx.getFile();
+                final String statement = readTestFile(testFile);
                 try {
-                    executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
+                    executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount,
                             expectedResultFileCtxs, testFile, actualPath);
                 } catch (Exception e) {
                     System.err.println("testFile " + testFile.toString() + " raised an exception: " + e);
@@ -1140,14 +1184,19 @@ public class TestExecutor {
         return servlet.getPath();
     }
 
-    protected String getEndpoint(Lets servlet) {
-        return "http://" + host + ":" + port + getPath(servlet).replaceAll("/\\*$", "");
+    protected URI getEndpoint(Lets servlet) throws URISyntaxException {
+        return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null);
     }
 
     public static String stripJavaComments(String text) {
         return JAVA_BLOCK_COMMENT_PATTERN.matcher(text).replaceAll("");
     }
 
+    public static String stripLineComments(String text) {
+        final String s = SHELL_LINE_COMMENT_PATTERN.matcher(text).replaceAll("");
+        return JAVA_LINE_COMMENT_PATTERN.matcher(s).replaceAll("");
+    }
+
     public void cleanup(String testCase, List<String> badtestcases) throws Exception {
         try {
             ArrayList<String> toBeDropped = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
index 50d103a..2eada38 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java
@@ -29,7 +29,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
 
-    @Parameters
+    @Parameters(name = "ManagixSqlppExecutionIT {index}: {0}")
     public static Collection<Object[]> tests() throws Exception {
         Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml");
         if (testArgs.size() == 0) {
@@ -48,10 +48,7 @@ public class ManagixSqlppExecutionIT extends ManagixExecutionIT {
 
     }
 
-    private TestCaseContext tcCtx;
-
     public ManagixSqlppExecutionIT(TestCaseContext tcCtx) {
         super(tcCtx);
-        this.tcCtx = tcCtx;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 6eaf4ae..0769596 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -92,6 +92,7 @@ import org.apache.asterix.om.typecomputer.impl.RecordMergeTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType;
+import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer;
 import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer;
@@ -651,6 +652,8 @@ public class BuiltinFunctions {
             "spatial-cell", 4);
     public static final FunctionIdentifier SWITCH_CASE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "switch-case", FunctionIdentifier.VARARGS);
+    public static final FunctionIdentifier SLEEP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "sleep", 2);
     public static final FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "inject-failure", 2);
     public static final FunctionIdentifier FLOW_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -1058,6 +1061,7 @@ public class BuiltinFunctions {
         addPrivateFunction(SUBSET_COLLECTION, SubsetCollectionTypeComputer.INSTANCE, true);
         addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true);
         addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true);
+        addFunction(SLEEP, SleepTypeComputer.INSTANCE, false);
         addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE, true);
         addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
index df050be..cc19ac4 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java
@@ -21,29 +21,31 @@ package org.apache.asterix.om.typecomputer.impl;
 import org.apache.asterix.om.exceptions.TypeMismatchException;
 import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 
+/**
+ * The first argument of INJECT_FAILURE can be any data model instance and will be passed verbatim to the
+ * caller. The second argument is a boolean that determines if the invocation throws an exception.
+ *
+ * Consequently {@link #checkArgType(String, int, IAType)} validates that the second argument is a
+ * boolean and {@link #getResultType(ILogicalExpression, IAType...)} returns the type of the first
+ * argument.
+ */
 public class InjectFailureTypeComputer extends AbstractResultTypeComputer {
 
     public static final InjectFailureTypeComputer INSTANCE = new InjectFailureTypeComputer();
 
-    protected InjectFailureTypeComputer() {
-    }
-
     @Override
     protected void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
-        ATypeTag actualTypeTag = type.getTypeTag();
-        if (actualTypeTag != ATypeTag.BOOLEAN) {
-            throw new TypeMismatchException(funcName, argIndex, actualTypeTag, ATypeTag.BOOLEAN);
+        if (argIndex == 1 && type.getTypeTag() != ATypeTag.BOOLEAN) {
+            throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.BOOLEAN);
         }
     }
 
     @Override
     protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
-        return BuiltinType.ABOOLEAN;
+        return strippedInputTypes[0];
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
new file mode 100644
index 0000000..6b885e3
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.exceptions.TypeMismatchException;
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class SleepTypeComputer extends AbstractResultTypeComputer {
+    public static final SleepTypeComputer INSTANCE = new SleepTypeComputer();
+
+    @Override
+    public void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException {
+        if (argIndex == 1) {
+            switch (type.getTypeTag()) {
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                    break;
+                default:
+                    throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.INT8,
+                            ATypeTag.INT16, ATypeTag.INT32, ATypeTag.INT64);
+            }
+        }
+    }
+
+    @Override
+    public IAType getResultType(ILogicalExpression expr, IAType... types) throws AlgebricksException {
+        return types[0];
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
index 164f369..af5f690 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.runtime.evaluators.functions;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
@@ -39,6 +42,9 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
@@ -75,6 +81,7 @@ public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescri
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(argPtr.getByteArray(),
                                     argPtr.getStartOffset() + 1);
                             if (argResult) {
+                                LOGGER.log(Level.SEVERE, ctx.getTaskAttemptId() + " injecting failure");
                                 throw new RuntimeDataException(ErrorCode.INJECTED_FAILURE, getIdentifier());
                             }
                         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
new file mode 100644
index 0000000..a186b32
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName());
+
+    public static final IFunctionDescriptorFactory FACTORY = SleepDescriptor::new;
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
+                return new IScalarEvaluator() {
+
+                    private IPointable argTime = new VoidPointable();
+                    private final IScalarEvaluator evalValue = args[0].createScalarEvaluator(ctx);
+                    private final IScalarEvaluator evalTime = args[1].createScalarEvaluator(ctx);
+
+                    @Override
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+                        evalValue.evaluate(tuple, result);
+                        evalTime.evaluate(tuple, argTime);
+
+                        final byte[] bytes = argTime.getByteArray();
+                        final int offset = argTime.getStartOffset();
+                        final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
+
+                        try {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " sleeping for " + time + " ms");
+                            }
+                            Thread.sleep(time);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        } finally {
+                            if (LOGGER.isLoggable(Level.INFO)) {
+                                LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " done sleeping for " + time + " ms");
+                            }
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SLEEP;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
index c926ec0..175ecc4 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
 
@@ -89,7 +90,8 @@ public class SampleLocalClusterIT {
     public void test1_sanityQuery() throws Exception {
         TestExecutor testExecutor = new TestExecutor();
         InputStream resultStream = testExecutor.executeQuery("1+1", OutputFormat.ADM,
-                "http://127.0.0.1:19002" + Lets.AQL_QUERY.getPath(), Collections.emptyList());
+                new URI("http", null, "127.0.0.1", 19002, Lets.AQL_QUERY.getPath(), null, null),
+                Collections.emptyList());
         StringWriter sw = new StringWriter();
         IOUtils.copy(resultStream, sw);
         Assert.assertEquals("2", sw.toString().trim());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
index e41a624..7a53a9e 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java
@@ -59,10 +59,16 @@ public class TestFileContext implements Comparable<TestFileContext> {
 
     @Override
     public int compareTo(TestFileContext o) {
-        if (this.seqNum > o.seqNum)
+        if (this.seqNum > o.seqNum) {
             return 1;
-        else if (this.seqNum < o.seqNum)
+        } else if (this.seqNum < o.seqNum) {
             return -1;
+        }
         return 0;
     }
+
+    @Override
+    public String toString() {
+        return String.valueOf(seqNum) + ":" + type + ":" + file;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index a50e1ee..3165840 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -71,17 +71,24 @@ public class DatasetDirectoryRecord implements Serializable {
     }
 
     public void start() {
-        status = Status.RUNNING;
+        updateStatus(Status.RUNNING);
     }
 
     public void writeEOS() {
-        status = Status.SUCCESS;
+        updateStatus(Status.SUCCESS);
     }
 
     public void fail() {
         status = Status.FAILED;
     }
 
+    private void updateStatus(final DatasetDirectoryRecord.Status newStatus) {
+        // FAILED is a stable status
+        if (status != Status.FAILED) {
+            status = newStatus;
+        }
+    }
+
     public Status getStatus() {
         return status;
     }
@@ -99,6 +106,6 @@ public class DatasetDirectoryRecord implements Serializable {
 
     @Override
     public String toString() {
-        return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
+        return String.valueOf(address) + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 34ed65c..f29ff4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -20,9 +20,14 @@ package org.apache.hyracks.api.dataset;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DatasetJobRecord implements IDatasetStateRecord {
     public enum Status {
+        IDLE,
         RUNNING,
         SUCCESS,
         FAILED
@@ -36,20 +41,30 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im
 
     private List<Exception> exceptions;
 
+    private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>();
+
     public DatasetJobRecord() {
         this.timestamp = System.currentTimeMillis();
-        this.status = Status.RUNNING;
+        this.status = Status.IDLE;
+    }
+
+    private void updateStatus(Status newStatus) {
+        // FAILED is a stable status
+        if (status != Status.FAILED) {
+            status = newStatus;
+        }
     }
 
     public void start() {
-        status = Status.RUNNING;
+        updateStatus(Status.RUNNING);
     }
 
     public void success() {
-        status = Status.SUCCESS;
+        updateStatus(Status.SUCCESS);
     }
 
-    public void fail() {
+    public void fail(ResultSetId rsId, int partition) {
+        getOrCreateDirectoryRecord(rsId, partition).fail();
         status = Status.FAILED;
     }
 
@@ -58,6 +73,7 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im
         this.exceptions = exceptions;
     }
 
+    @Override
     public long getTimestamp() {
         return timestamp;
     }
@@ -66,7 +82,57 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im
         return status;
     }
 
+    @Override
+    public String toString() {
+        return resultSetMetadataMap.toString();
+    }
+
     public List<Exception> getExceptions() {
         return exceptions;
     }
+
+    public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions) throws
+            HyracksDataException {
+        ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
+        if (rsMd == null) {
+            resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult));
+        } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) {
+            throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString());
+        }
+        //TODO(tillw) throwing a HyracksDataException here hangs the execution tests
+    }
+
+    public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
+        return resultSetMetadataMap.get(rsId);
+    }
+
+    public synchronized DatasetDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            records[partition] = new DatasetDirectoryRecord();
+        }
+        return records[partition];
+    }
+
+    public synchronized DatasetDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) throws
+            HyracksDataException {
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        if (records[partition] == null) {
+            throw new HyracksDataException("no record for partition " + partition + " of result set " + rsId);
+        }
+        return records[partition];
+    }
+
+    public synchronized void updateStatus(ResultSetId rsId) {
+        int successCount = 0;
+        DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords();
+        for (DatasetDirectoryRecord record : records) {
+            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
+                successCount++;
+            }
+        }
+        if (successCount == records.length) {
+            success();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
index 2285981..8e9e3dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
@@ -18,14 +18,15 @@
  */
 package org.apache.hyracks.api.dataset;
 
-public class ResultSetMetaData {
-    private final boolean ordered;
+import java.util.Arrays;
 
+public class ResultSetMetaData {
     private final DatasetDirectoryRecord[] records;
+    private final boolean ordered;
 
-    public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+    ResultSetMetaData(int len, boolean ordered) {
+        this.records = new DatasetDirectoryRecord[len];
         this.ordered = ordered;
-        this.records = records;
     }
 
     public boolean getOrderedResult() {
@@ -35,4 +36,11 @@ public class ResultSetMetaData {
     public DatasetDirectoryRecord[] getRecords() {
         return records;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records));
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index d094368..35002f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -49,6 +49,10 @@ public class ErrorCode {
     public static final int ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT = 12;
     public static final int DUPLICATE_IODEVICE = 13;
     public static final int NESTED_IODEVICES = 14;
+    public static final int MORE_THAN_ONE_RESULT = 15;
+    public static final int RESULT_FAILURE_EXCEPTION = 16;
+    public static final int RESULT_FAILURE_NO_EXCEPTION = 17;
+    public static final int INCONSISTENT_RESULT_METADATA = 18;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 0fd6923..404104d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -36,6 +36,11 @@ public class HyracksDataException extends HyracksException {
         return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params);
     }
 
+    public static HyracksDataException create(HyracksDataException e, String nodeId) {
+        return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e
+                .getParams());
+    }
+
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
         super(component, errorCode, message, cause, nodeId, params);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index b1fa494..7969700 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -26,6 +26,9 @@ import java.io.Serializable;
 import org.apache.hyracks.api.io.IWritable;
 
 public final class JobId implements IWritable, Serializable {
+
+    public static final JobId INVALID = new JobId(-1l);
+
     private static final long serialVersionUID = 1L;
     private long id;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 72f7c65..de58f33 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -33,5 +33,9 @@
 12 = Invalid attempt to write to a flushed append only metadata page
 13 = Duplicate IODevices are not allowed
 14 = IODevices should not be nested within each other
+15 = More than 1 result for job %1$s
+16 = Failure producing result set %1$s for job %2$s
+17 = No exception for failed result set %1$s for job %2$s
+18 = Inconsistent metadata for result set %1$s"
 
 10000 = The given rule collection %1$s is not an instance of the List class.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ae0f361..37c4177 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -325,10 +325,14 @@ public class ClusterControllerService implements IControllerService {
         return workQueue;
     }
 
-    public Executor getExecutor() {
+    public ExecutorService getExecutorService() {
         return executor;
     }
 
+    public Executor getExecutor() {
+        return getExecutorService();
+    }
+
     public CCConfig getConfig() {
         return ccConfig;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 4d7d1c3..46a173e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -33,6 +35,7 @@ import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.dataset.IDatasetStateRecord;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.dataset.ResultSetMetaData;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -48,6 +51,9 @@ import org.apache.hyracks.control.common.work.IResultCallback;
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
+
+    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
     private final long resultTTL;
 
     private final long resultSweepThreshold;
@@ -62,22 +68,24 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER));
     }
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr == null) {
-            djr = new DatasetJobRecord();
-            jobResultLocations.put(jobId, new JobResultInfo(djr, null));
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId);
+        }
+        if (jobResultLocations.get(jobId) != null) {
+            throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
         }
+        jobResultLocations.put(jobId, new JobResultInfo(new DatasetJobRecord(), null));
     }
 
     @Override
     public void notifyJobStart(JobId jobId) throws HyracksException {
-        // Auto-generated method stub
+        jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
@@ -87,35 +95,36 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
         final JobResultInfo jri = jobResultLocations.get(jobId);
-        return jri == null ? null : jri.record;
+        return jri == null ? null : jri.getRecord();
     }
 
-    @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        if (resultSetMetaData == null) {
-            resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
-            djr.put(rsId, resultSetMetaData);
-        }
-
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        if (records[partition] == null) {
-            records[partition] = new DatasetDirectoryRecord();
+    private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) {
+        final DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        if (djr == null) {
+            throw new NullPointerException();
         }
-        records[partition].setNetworkAddress(networkAddress);
-        records[partition].setEmpty(emptyResult);
-        records[partition].start();
+        return djr;
+    }
 
-        Waiters waiters = jobResultLocations.get(jobId).waiters;
-        Waiter waiter = waiters != null ? waiters.get(rsId) : null;
+    @Override
+    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws
+            HyracksDataException {
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.setResultSetMetaData(rsId, orderedResult, nPartitions);
+        DatasetDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition);
+
+        record.setNetworkAddress(networkAddress);
+        record.setEmpty(emptyResult);
+        record.start();
+
+        final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+        Waiter waiter = jobResultInfo.getWaiter(rsId);
         if (waiter != null) {
             try {
                 DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
                 if (updatedRecords != null) {
-                    waiters.remove(rsId);
+                    jobResultInfo.removeWaiter(rsId);
                     waiter.callback.setValue(updatedRecords);
                 }
             } catch (Exception e) {
@@ -126,51 +135,28 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
     }
 
     @Override
-    public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
-        int successCount = 0;
-
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        records[partition].writeEOS();
-
-        for (DatasetDirectoryRecord record : records) {
-            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
-                successCount++;
-            }
-        }
-        if (successCount == records.length) {
-            djr.success();
-        }
+    public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+            throws HyracksDataException {
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.getDirectoryRecord(rsId, partition).writeEOS();
+        djr.updateStatus(rsId);
         notifyAll();
     }
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail();
-        }
-        final Waiters waiters = jobResultLocations.get(jobId).waiters;
-        if (waiters != null) {
-            waiters.get(rsId).callback.setException(new Exception());
-            waiters.remove(rsId);
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.fail(rsId, partition);
+        jobResultLocations.get(jobId).setException(new Exception());
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-        if (djr != null) {
-            djr.fail(exceptions);
-        }
-        final Waiters waiters = jobResultLocations.get(jobId).waiters;
-        if (waiters != null) {
-            for (ResultSetId rsId : waiters.keySet()) {
-                waiters.remove(rsId).callback.setException(exceptions.get(0));
-            }
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
+        djr.fail(exceptions);
+        // TODO(tillw) throwing an NPE here hangs the system, why?
+        jobResultLocations.get(jobId).setException(exceptions.isEmpty() ? null : exceptions.get(0));
         notifyAll();
     }
 
@@ -184,7 +170,6 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
                 throw new HyracksDataException(e);
             }
         }
-
         return djr.getStatus();
     }
 
@@ -195,7 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 
     @Override
     public IDatasetStateRecord getState(JobId jobId) {
-        return jobResultLocations.get(jobId).record;
+        return getDatasetJobRecord(jobId);
     }
 
     @Override
@@ -210,20 +195,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
             throws HyracksDataException {
         DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
         if (updatedRecords == null) {
-            JobResultInfo jri = jobResultLocations.get(jobId);
-            Waiters waiters;
-            if (jri == null) {
-                waiters = new Waiters();
-                jri = new JobResultInfo(null, waiters);
-                jobResultLocations.put(jobId, jri);
-            } else {
-                waiters = jri.waiters;
-                if (waiters == null) {
-                    waiters = new Waiters();
-                    jri.waiters = waiters;
-                }
-            }
-            waiters.put(rsId, new Waiter(knownRecords, callback));
+            jobResultLocations.get(jobId).addWaiter(rsId, knownRecords, callback);
         } else {
             callback.setValue(updatedRecords);
         }
@@ -248,26 +220,25 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = getDatasetJobRecord(jobId);
-
-        if (djr == null) {
-            throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
-        }
+        DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId);
 
         if (djr.getStatus() == Status.FAILED) {
             List<Exception> caughtExceptions = djr.getExceptions();
-            if (caughtExceptions == null) {
-                throw new HyracksDataException("Job failed.");
+            if (caughtExceptions != null && !caughtExceptions.isEmpty()) {
+                final Exception cause = caughtExceptions.get(caughtExceptions.size() - 1);
+                if (cause instanceof HyracksDataException) {
+                    throw (HyracksDataException) cause;
+                }
+                throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_EXCEPTION, cause, rsId, jobId);
             } else {
-                throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1));
+                throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId);
             }
         }
 
-        ResultSetMetaData resultSetMetaData = djr.get(rsId);
-        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+        final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId);
+        if (resultSetMetaData == null) {
             return null;
         }
-
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
 
         return Arrays.equals(records, knownRecords) ? null : records;
@@ -275,13 +246,42 @@ public class DatasetDirectoryService implements IDatasetDirectoryService {
 }
 
 class JobResultInfo {
+
+    private DatasetJobRecord record;
+    private Waiters waiters;
+
     JobResultInfo(DatasetJobRecord record, Waiters waiters) {
         this.record = record;
         this.waiters = waiters;
     }
 
-    DatasetJobRecord record;
-    Waiters waiters;
+    DatasetJobRecord getRecord() {
+        return record;
+    }
+
+    void addWaiter(ResultSetId rsId, DatasetDirectoryRecord[] knownRecords,
+            IResultCallback<DatasetDirectoryRecord[]> callback) {
+        if (waiters == null) {
+            waiters = new Waiters();
+        }
+        waiters.put(rsId, new Waiter(knownRecords, callback));
+    }
+
+    Waiter removeWaiter(ResultSetId rsId) {
+        return waiters.remove(rsId);
+    }
+
+    Waiter getWaiter(ResultSetId rsId) {
+        return waiters != null ? waiters.get(rsId) : null;
+    }
+
+    void setException(Exception exception) {
+        if (waiters != null) {
+            for (ResultSetId rsId : waiters.keySet()) {
+                waiters.remove(rsId).callback.setException(exception);
+            }
+        }
+    }
 }
 
 class Waiters extends HashMap<ResultSetId, Waiter> {
@@ -289,11 +289,11 @@ class Waiters extends HashMap<ResultSetId, Waiter> {
 }
 
 class Waiter {
+    DatasetDirectoryRecord[] knownRecords;
+    IResultCallback<DatasetDirectoryRecord[]> callback;
+
     Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
         this.knownRecords = knownRecords;
         this.callback = callback;
     }
-
-    DatasetDirectoryRecord[] knownRecords;
-    IResultCallback<DatasetDirectoryRecord[]> callback;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 9e4e03e..663a53a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -35,9 +35,11 @@ public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatase
     public void init(ExecutorService executor);
 
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress);
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress)
+            throws HyracksDataException;
 
-    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
+    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition)
+            throws HyracksDataException;
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4e4732d..f51dd06 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,8 @@ public class RegisterResultPartitionLocationWork extends AbstractWork {
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
+            networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
@@ -55,8 +57,13 @@ public class RegisterResultPartitionLocationWork extends AbstractWork {
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
-                partition, nPartitions, networkAddress);
+        try {
+            ccs.getDatasetDirectoryService()
+                    .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
+                            networkAddress);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index ffae76a..d63bc8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc.work;
 
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -42,7 +43,11 @@ public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+        try {
+            ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index 150875b..67b87f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -31,7 +31,6 @@ import org.apache.hyracks.api.job.JobId;
  * Sweeper to clean up the stale result distribution files and result states.
  */
 public class ResultStateSweeper implements Runnable {
-    private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
 
     private final IDatasetManager datasetManager;
 
@@ -39,12 +38,16 @@ public class ResultStateSweeper implements Runnable {
 
     private final long resultSweepThreshold;
 
+    private final Logger logger;
+
     private final List<JobId> toBeCollected;
 
-    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold,
+            Logger logger) {
         this.datasetManager = datasetManager;
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
+        this.logger = logger;
         toBeCollected = new ArrayList<JobId>();
     }
 
@@ -56,11 +59,10 @@ public class ResultStateSweeper implements Runnable {
                 Thread.sleep(resultSweepThreshold);
                 sweep();
             } catch (InterruptedException e) {
-                LOGGER.severe("Result cleaner thread interrupted, shutting down.");
+                logger.log(Level.SEVERE, "Result cleaner thread interrupted, shutting down.", e);
                 break; // the interrupt was explicit from another thread. This thread should shut down...
             }
         }
-
     }
 
     private void sweep() {
@@ -75,8 +77,8 @@ public class ResultStateSweeper implements Runnable {
                 datasetManager.deinitState(jobId);
             }
         }
-        if (LOGGER.isLoggable(Level.FINER)) {
-            LOGGER.finer("Result state cleanup instance successfully completed.");
+        if (logger.isLoggable(Level.FINER)) {
+            logger.finer("Result state cleanup instance successfully completed.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
index 73c680f..3bc549e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
@@ -58,8 +58,12 @@ public class ExceptionUtils {
     public static void setNodeIds(Collection<Exception> exceptions, String nodeId) {
         List<Exception> newExceptions = new ArrayList<>();
         for (Exception e : exceptions) {
-            newExceptions.add(
-                    new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), e, nodeId));
+            if (e instanceof HyracksDataException) {
+                newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId));
+            } else {
+                newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(),
+                        e, nodeId));
+            }
         }
         exceptions.clear();
         exceptions.addAll(newExceptions);


Mime
View raw message