asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wail Alkowaileet (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: AsterixDB Schema Inferencer
Date Mon, 18 Jul 2016 11:03:12 GMT
Wail Alkowaileet has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1003

Change subject: AsterixDB Schema Inferencer
......................................................................

AsterixDB Schema Inferencer

Add schema builder to the clean JSON output format.

Change-Id: Ia6077216ba457a182e8034ed47536fc5f4dcb639
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/SessionConfig.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultLocationsAPIServlets.java
A asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultSchemaAPIServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-app/src/main/resources/webui/querytemplate.html
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
M asterixdb/asterix-om/pom.xml
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AFlatPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AObjectPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOptionalFieldPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOrderedlistPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/ARecordPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnionPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnorderedlistPrinterFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AbstractPrinterWithSchemaFactory.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/IPrinterWithSchemaFactory.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlCleanJSONWithSchemaPrinterFactoryProvider.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AListPrinter.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/AListPrinterSchema.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/APrintVisitor.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/ARecordPrinterSchema.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/HeterogeneousTypeComputerLoader.java
A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/SchemaBuilder.java
M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
A asterixdb/asterix-schema-spark/pom.xml
A asterixdb/asterix-schema-spark/src/main/java/org/apache/asterix/schema/spark/SparkHeterogeneousTypeComputer.java
A asterixdb/asterix-schema/pom.xml
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaMessage.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaRegisterMessage.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractNestedSchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractSchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/FlatSchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/IHeterogeneousTypeComputer.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNodeType.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ListSchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/RecordSchemaNode.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/pool/NodesObjectPool.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/FlatSchemaNodeType.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/NestedSchemaNodeType.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaDircetoryService.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaReporterService.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/type/computer/AsterixHeterogeneousTypeComputer.java
A asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/utils/ResultSchemaUtils.java
A asterixdb/asterix-schema/src/main/resources/schema-inferencers.properties
M asterixdb/pom.xml
M build.xml
59 files changed, 4,201 insertions(+), 23 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/1003/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 68fbba4..9fa51e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -46,9 +46,11 @@
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.om.schema.builder.HeterogeneousTypeComputerLoader;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.optimizer.base.RuleCollections;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -310,6 +312,17 @@
             case CLEAN_JSON:
                 builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
                 break;
+            case CLEAN_JSON_WITH_SCHEMA:
+                if (conf.is(SessionConfig.PRINT_SCHEMA)) {
+                    long schemaId = conf.getSchemaId();
+                    String typeComputerName = conf.getHeterogeneousTypeComputerName();
+                    IHeterogeneousTypeComputer typeComputer = HeterogeneousTypeComputerLoader.INSTANCE
+                            .getTypeComputer(typeComputerName);
+                    builder.setPrinterProvider(
+                            format.getCleanJSONWithSchemaPrinterFactoryProvider(schemaId, typeComputer));
+                } else
+                    builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
+                break;
             default:
                 throw new RuntimeException("Unexpected OutputFormat!");
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index cc50b75..c49b00a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -65,8 +65,8 @@
         ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
         ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.resultTTL = 30000;
-        ccConfig.resultSweepThreshold = 1000;
+        ccConfig.resultTTL = 30000000;
+        ccConfig.resultSweepThreshold = 1000000;
         ccConfig.appCCMainClass = CCApplicationEntryPoint.class.getName();
         // ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
@@ -83,8 +83,8 @@
             ncConfig1.dataIPAddress = "127.0.0.1";
             ncConfig1.resultIPAddress = "127.0.0.1";
             ncConfig1.nodeId = ncName;
-            ncConfig1.resultTTL = 30000;
-            ncConfig1.resultSweepThreshold = 1000;
+            ncConfig1.resultTTL = 30000000;
+            ncConfig1.resultSweepThreshold = 1000000;
             ncConfig1.appArgs = Arrays.asList("-virtual-NC");
             String tempPath = System.getProperty(IO_DIR_KEY);
             if (tempPath.endsWith(File.separator)) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/SessionConfig.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/SessionConfig.java
index c09b424..99c2a8d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/SessionConfig.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/SessionConfig.java
@@ -44,7 +44,17 @@
         ADM,
         CSV,
         CLEAN_JSON,
-        LOSSLESS_JSON
+        LOSSLESS_JSON,
+        CLEAN_JSON_WITH_SCHEMA
+    };
+
+    /**
+     * Used to specify the output schema format
+     */
+    public enum SchemaType {
+        ADM,
+        PRETTY,
+        DUMMY_JSON
     };
 
     /**
@@ -102,6 +112,11 @@
      */
     public static final String FORMAT_QUOTE_RECORD = "quote-record";
 
+    /**
+     * Print schema flag: print schema for the result
+     */
+    public static final String PRINT_SCHEMA = "print-schema";
+
     public interface ResultDecorator {
         PrintWriter print(PrintWriter pw);
     }
@@ -123,6 +138,16 @@
     // Flags.
     private final Map<String, Boolean> flags;
 
+    // Custom IPrinterFactoryProvider implementation className
+    private final String heterogeneousTypeComputerName;
+
+    // Pair<jobId,schemaId> resolve what job has what schema
+    private long jobId;
+    private long schemaId;
+
+    // Schema type
+    private SchemaType schemaType;
+
     /**
      * Create a SessionConfig object with all default values:
      * - All format flags set to "false".
@@ -138,6 +163,10 @@
      */
     public SessionConfig(PrintWriter out, OutputFormat fmt) {
         this(out, fmt, null, null, true, true, true);
+    }
+
+    public SessionConfig(PrintWriter out, OutputFormat fmt, String heterogeneousTypeComputerName) {
+        this(out, fmt, null, null, true, true, true, heterogeneousTypeComputerName);
     }
 
     public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
@@ -169,6 +198,12 @@
      */
     public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
             ResultDecorator postResultDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) {
+        this(out, fmt, preResultDecorator, postResultDecorator, optimize, executeQuery, generateJobSpec, "");
+    }
+
+    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec,
+            String heterogeneousTypeComputerName) {
         this.out = out;
         this.fmt = fmt;
         this.preResultDecorator = preResultDecorator;
@@ -177,6 +212,8 @@
         this.executeQuery = executeQuery;
         this.generateJobSpec = generateJobSpec;
         this.flags = new HashMap<String, Boolean>();
+        this.heterogeneousTypeComputerName = heterogeneousTypeComputerName;
+        this.schemaType = SchemaType.PRETTY;
     }
 
     /**
@@ -257,4 +294,45 @@
         Boolean value = flags.get(flag);
         return value == null ? false : value.booleanValue();
     }
+
+    /**
+     * Retrieve printerFacotryProvider class name to be loaded.
+     */
+    public String getHeterogeneousTypeComputerName() {
+        return heterogeneousTypeComputerName;
+    }
+
+    public void setJobId(long jobId) {
+        this.jobId = jobId;
+    }
+
+    public long getJobId() {
+        return jobId;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    public void setSchemaId(long schemaId) {
+        this.schemaId = schemaId;
+    }
+
+    public void setSchemaFormat(String schemaType) {
+        if (schemaType == null) {
+            this.schemaType = SchemaType.PRETTY;
+            return;
+        }
+
+        if (schemaType.equals("ADM"))
+            this.schemaType = SchemaType.ADM;
+        else if (schemaType.equals("DUMMY_JSON"))
+            this.schemaType = SchemaType.DUMMY_JSON;
+        else
+            this.schemaType = SchemaType.PRETTY;
+    }
+
+    public SchemaType getSchemaFormat() {
+        return schemaType;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
index 7a47ca9..733a6db 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java
@@ -26,6 +26,7 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.util.List;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -49,11 +50,13 @@
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.om.schema.builder.HeterogeneousTypeComputerLoader;
 import org.apache.asterix.result.ResultReader;
 import org.apache.asterix.result.ResultUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.json.JSONArray;
 
 public class APIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
@@ -79,15 +82,22 @@
         IParserFactory parserFactory = compilationProvider.getParserFactory();
 
         // Output format.
-        OutputFormat format;
+        OutputFormat format = null;
         boolean csv_and_header = false;
         String output = request.getParameter("output-format");
         try {
             format = OutputFormat.valueOf(output);
         } catch (IllegalArgumentException e) {
             LOGGER.info(output + ": unsupported output-format, using " + OutputFormat.CLEAN_JSON + " instead");
-            // Default output format
-            format = OutputFormat.CLEAN_JSON;
+        }
+
+        String schemaInferencer = null;
+        if (format == OutputFormat.CLEAN_JSON) {
+            schemaInferencer = request.getParameter("schema-inferencer");
+            if (schemaInferencer != null && "NO_SCHEMA".equals(schemaInferencer))
+                schemaInferencer = null;
+            else
+                format = OutputFormat.CLEAN_JSON_WITH_SCHEMA;
         }
 
         String query = request.getParameter("query");
@@ -117,7 +127,20 @@
             }
             IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
-            SessionConfig sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+            SessionConfig sessionConfig;
+
+            if (format != OutputFormat.CLEAN_JSON_WITH_SCHEMA) {
+                //No schema
+                sessionConfig = new SessionConfig(out, format, true, isSet(executeQuery), true);
+            } else {
+                //Print with Schema
+                sessionConfig = new SessionConfig(out, format, null, null, true, isSet(executeQuery), true,
+                        schemaInferencer);
+                sessionConfig.setSchemaFormat(request.getParameter("schema-format"));
+                sessionConfig.set(SessionConfig.PRINT_SCHEMA, true);
+
+            }
+
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
             sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csv_and_header);
             sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, isSet(wrapperArray));
@@ -149,6 +172,12 @@
         if (requestURI.equals("/")) {
             response.setContentType("text/html");
             resourcePath = "/webui/querytemplate.html";
+        } else if (requestURI.equals("/schema/inferencers")) {
+            //To capture the request from the GUI to print the schema
+            Set<String> names = HeterogeneousTypeComputerLoader.INSTANCE.getNameClassNameMap();
+            JSONArray inferencers = new JSONArray(names);
+            response.getWriter().println(inferencers);
+            return;
         } else {
             resourcePath = requestURI;
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultLocationsAPIServlets.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultLocationsAPIServlets.java
new file mode 100644
index 0000000..f6038e7
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultLocationsAPIServlets.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.asterix.result.ResultReader;
+import org.apache.asterix.result.ResultUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.client.dataset.HyracksDatasetDirectoryServiceConnection;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * A servlet to get the result locations.
+ */
+public class QueryResultLocationsAPIServlets extends HttpServlet {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
+
+    private static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        response.setContentType("text/html");
+        response.setCharacterEncoding("utf-8");
+        String strHandle = request.getParameter("handle");
+        PrintWriter out = response.getWriter();
+        ServletContext context = getServletContext();
+        IHyracksClientConnection hcc;
+        IHyracksDataset hds;
+
+        try {
+            HyracksProperties hp = new HyracksProperties();
+            String strIP = hp.getHyracksIPAddress();
+            int port = hp.getHyracksPort();
+
+            synchronized (context) {
+                hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
+                if (hcc == null) {
+                    hcc = new HyracksConnection(strIP, port);
+                    context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+                }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
+            }
+            JSONObject handleObj = new JSONObject(strHandle);
+            JSONArray handle = handleObj.getJSONArray("handle");
+            JobId jobId = new JobId(handle.getLong(0));
+            ResultSetId rsId = new ResultSetId(handle.getLong(1));
+
+            ResultReader resultReader = new ResultReader(hcc, hds);
+            resultReader.open(jobId, rsId);
+
+            JSONObject jsonResponse = new JSONObject();
+            String status;
+            switch (resultReader.getStatus()) {
+                case RUNNING:
+                    status = "RUNNING";
+                    break;
+                case FAILED:
+                    status = "ERROR";
+                    break;
+                case SUCCESS:
+                    status = "SUCCESS";
+                    break;
+                default:
+                    status = "ERROR";
+            }
+
+            DatasetDirectoryRecord[] locs = getResultsLocations(hcc, jobId, rsId);
+            if (resultReader.getStatus() == Status.SUCCESS && isAllReady(locs)) {
+                prepareLocations(jsonResponse, locs);
+            } else if (resultReader.getStatus() == Status.SUCCESS)
+                jsonResponse.put("status", "RUNNING");
+            else
+                jsonResponse.put("status", status);
+
+            out.write(jsonResponse.toString());
+
+        } catch (Exception e) {
+            ResultUtils.webUIErrorHandler(out, e);
+        }
+    }
+
+    private DatasetDirectoryRecord[] getResultsLocations(IHyracksClientConnection hcc, JobId jobId,
+            ResultSetId resultSetId) throws Exception {
+        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
+        IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(
+                ddsAddress.getAddress(), ddsAddress.getPort());
+
+        return datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId, null);
+
+    }
+
+    private boolean isAllReady(DatasetDirectoryRecord[] locs) {
+        for (DatasetDirectoryRecord loc : locs) {
+            if (loc == null)
+                return false;
+        }
+
+        return true;
+    }
+
+    private void prepareLocations(JSONObject jsonResponse, DatasetDirectoryRecord[] locs) throws JSONException {
+        JSONArray locsJsonArray = new JSONArray();
+        for (DatasetDirectoryRecord loc : locs) {
+            JSONObject locJsonObject = new JSONObject();
+            locJsonObject.put("address", loc.getNetworkAddress().getAddress());
+            locJsonObject.put("port", loc.getNetworkAddress().getPort());
+            locsJsonArray.put(locJsonObject);
+        }
+
+        jsonResponse.put("locations", locsJsonArray);
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultSchemaAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultSchemaAPIServlet.java
new file mode 100644
index 0000000..6dde03f
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultSchemaAPIServlet.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.asterix.result.ResultReader;
+import org.apache.asterix.result.ResultUtils;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.asterix.schema.service.SchemaDircetoryService;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+/**
+ * A servlet for getting the schema.
+ */
+public class QueryResultSchemaAPIServlet extends HttpServlet {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
+
+    private static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET";
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        response.setContentType("text/html");
+        response.setCharacterEncoding("utf-8");
+        String strHandle = request.getParameter("handle");
+        PrintWriter out = response.getWriter();
+        ServletContext context = getServletContext();
+        IHyracksClientConnection hcc;
+        IHyracksDataset hds;
+
+        try {
+            HyracksProperties hp = new HyracksProperties();
+            String strIP = hp.getHyracksIPAddress();
+            int port = hp.getHyracksPort();
+
+            synchronized (context) {
+                hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
+                if (hcc == null) {
+                    hcc = new HyracksConnection(strIP, port);
+                    context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+                }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
+            }
+            JSONObject handleObj = new JSONObject(strHandle);
+            JSONArray handle = handleObj.getJSONArray("handle");
+            JobId jobId = new JobId(handle.getLong(0));
+            ResultSetId rsId = new ResultSetId(handle.getLong(1));
+
+            ResultReader resultReader = new ResultReader(hcc, hds);
+            resultReader.open(jobId, rsId);
+
+            JSONObject jsonResponse = new JSONObject();
+            String status;
+            switch (resultReader.getStatus()) {
+                case RUNNING:
+                    status = "RUNNING";
+                    break;
+                case FAILED:
+                    status = "ERROR";
+                    break;
+                case SUCCESS:
+                    status = "SUCCESS";
+                    break;
+                default:
+                    status = "ERROR";
+            }
+
+            if (resultReader.getStatus() == Status.SUCCESS) {
+                String schemaFormat = request.getParameter("schema-format");
+
+                if (SchemaDircetoryService.INSTANCE.prepareSchemas(jobId.getId())) {
+                    ISchemaNode schema = ResultSchemaUtils
+                            .mergeSchemas(SchemaDircetoryService.INSTANCE.getSchemas(jobId.getId()));
+
+                    if (schemaFormat == null)
+                        schemaFormat = "PRETTY";
+
+                    if (!schemaFormat.equals("ADM_AND_DUMMY_JSON")) {
+                        if (schemaFormat.equals("ADM"))
+                            out.println(schema.toADM());
+                        else if (schemaFormat.equals("DUMMY_JSON")) {
+                            String dummyJson;
+                            JSONArray tuple = new JSONArray();
+                            schema.toDummyJSON(tuple);
+                            dummyJson = tuple.toString();
+                            dummyJson = dummyJson.substring(1, dummyJson.length() - 1);
+                            out.println();
+                        } else
+                            schema.printPretty(out);
+                    } else {
+                        JSONObject schemasOutput = new JSONObject();
+                        schemasOutput.put("ADM", schema.toADM());
+                        JSONArray tuple = new JSONArray();
+                        schema.toDummyJSON(tuple);
+                        schemasOutput.put("DUMMY_JSON", tuple.toString());
+                        out.println(schemasOutput.toString());
+                    }
+                    return;
+
+                } else if (resultReader.getStatus() == Status.SUCCESS)
+                    jsonResponse.put("status", "RUNNING");
+                else
+                    jsonResponse.put("status", status);
+
+                out.write(jsonResponse.toString());
+            } else {
+                out.write("null");
+            }
+
+        } catch (Exception e) {
+            ResultUtils.webUIErrorHandler(out, e);
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
index 95e55f6..b20e2a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java
@@ -105,7 +105,23 @@
             format = OutputFormat.LOSSLESS_JSON;
         }
 
-        SessionConfig sessionConfig = new SessionConfig(response.getWriter(), format);
+        SessionConfig sessionConfig;
+        String schemaInferencer = null;
+        schemaInferencer = request.getParameter("schema-inferencer");
+        if (schemaInferencer != null) {
+            if (schemaInferencer.equals("NO_SCHEMA"))
+                schemaInferencer = null;
+            else
+                format = OutputFormat.CLEAN_JSON_WITH_SCHEMA;
+        }
+
+        if (format == OutputFormat.CLEAN_JSON_WITH_SCHEMA) {
+            sessionConfig = new SessionConfig(response.getWriter(), format, schemaInferencer);
+            String schemaFormat = request.getParameter("schema-format");
+            sessionConfig.setSchemaFormat(schemaFormat);
+            sessionConfig.set(SessionConfig.PRINT_SCHEMA, true);
+        } else
+            sessionConfig = new SessionConfig(response.getWriter(), format);
 
         // If it's JSON or ADM, check for the "wrapper-array" flag. Default is
         // "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -135,6 +151,7 @@
                 response.setContentType("application/x-adm");
                 break;
             case CLEAN_JSON:
+            case CLEAN_JSON_WITH_SCHEMA:
                 // No need to reflect "clean-ness" in output type; fall through
             case LOSSLESS_JSON:
                 response.setContentType("application/json");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 295b308..ed1f4c5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -148,6 +148,7 @@
 import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import org.apache.asterix.result.ResultReader;
 import org.apache.asterix.result.ResultUtils;
+import org.apache.asterix.schema.service.SchemaDircetoryService;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
@@ -385,6 +386,8 @@
                     metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
                     metadataProvider.setResultAsyncMode(
                             resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                    if (sessionConfig.is(SessionConfig.PRINT_SCHEMA))
+                        sessionConfig.setSchemaId(SchemaDircetoryService.INSTANCE.generateSchemaId());
                     handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats);
                     break;
                 }
@@ -2546,6 +2549,12 @@
                 GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
                 JobId jobId = JobUtils.runJob(hcc, compiled, false);
 
+                //Map jobId with schemaId
+                if (sessionConfig.is(SessionConfig.PRINT_SCHEMA)) {
+                    SchemaDircetoryService.INSTANCE.setJobSchemaId(jobId.getId(), sessionConfig.getSchemaId());
+                    sessionConfig.setJobId(jobId.getId());
+                }
+
                 JSONObject response = new JSONObject();
                 switch (resultDelivery) {
                     case ASYNC:
@@ -2563,6 +2572,8 @@
                         resultReader.open(jobId, metadataProvider.getResultSetId());
                         ResultUtils.displayResults(resultReader, sessionConfig, stats,
                                 metadataProvider.findOutputRecordType());
+                        if (sessionConfig.is(SessionConfig.PRINT_SCHEMA))
+                            ResultUtils.printSchema(sessionConfig);
                         break;
                     case ASYNC_DEFERRED:
                         handle = new JSONArray();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 06ec6bc..efc46e1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -29,6 +29,8 @@
 import org.apache.asterix.api.http.servlet.FeedServlet;
 import org.apache.asterix.api.http.servlet.QueryAPIServlet;
 import org.apache.asterix.api.http.servlet.QueryResultAPIServlet;
+import org.apache.asterix.api.http.servlet.QueryResultLocationsAPIServlets;
+import org.apache.asterix.api.http.servlet.QueryResultSchemaAPIServlet;
 import org.apache.asterix.api.http.servlet.QueryServiceServlet;
 import org.apache.asterix.api.http.servlet.QueryStatusAPIServlet;
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
@@ -53,6 +55,7 @@
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.schema.service.SchemaDircetoryService;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -116,6 +119,7 @@
 
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
         ccAppCtx.setMessageBroker(messageBroker);
+        SchemaDircetoryService.INSTANCE.setMessageBroker(messageBroker);
     }
 
     @Override
@@ -193,6 +197,12 @@
         context.addServlet(new ServletHolder(new ShutdownAPIServlet()), Servlets.SHUTDOWN.getPath());
         context.addServlet(new ServletHolder(new VersionAPIServlet()), Servlets.VERSION.getPath());
         context.addServlet(new ServletHolder(new ClusterAPIServlet()), Servlets.CLUSTER_STATE.getPath());
+
+        //Asterix Connector services
+        context.addServlet(new ServletHolder(new QueryResultLocationsAPIServlets()),
+                Servlets.QUERY_RESULT_LOCATION.getPath());
+        context.addServlet(new ServletHolder(new QueryResultSchemaAPIServlet()),
+                Servlets.QUERY_RESULT_SCHEMA.getPath());
     }
 
     private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 2555b5a..a3112fb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.schema.service.SchemaReporterService;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 import org.apache.commons.io.FileUtils;
@@ -108,6 +109,7 @@
         ncApplicationContext.setApplicationObject(runtimeContext);
         messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService());
         ncApplicationContext.setMessageBroker(messageBroker);
+        SchemaReporterService.INSTANCE.setMessageBroker(messageBroker);
 
         boolean replicationEnabled = AsterixClusterProperties.INSTANCE.isReplicationEnabled();
         boolean autoFailover = AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 9dd4025..ebdbc42 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -39,6 +39,9 @@
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.schema.message.SchemaMessage;
+import org.apache.asterix.schema.message.SchemaRegisterMessage;
+import org.apache.asterix.schema.service.SchemaDircetoryService;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -83,6 +86,12 @@
                 break;
             case FEED_PROVIDER_READY:
                 handleFeedProviderReady(message);
+                break;
+            case SCHEMA_REGISTER_REQUEST:
+                handleSchemaRegisterRequest(message, nodeId);
+                break;
+            case SCHEMA_MESSAGE:
+                handleSchemaMessage(message, nodeId);
                 break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
@@ -158,4 +167,14 @@
         PreparePartitionsFailbackResponseMessage msg = (PreparePartitionsFailbackResponseMessage) message;
         AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(msg);
     }
+
+    private void handleSchemaRegisterRequest(IMessage message, String nodeId) {
+        SchemaRegisterMessage schemaMessage = (SchemaRegisterMessage) message;
+        SchemaDircetoryService.INSTANCE.registerSchemaLocation(schemaMessage, nodeId);
+    }
+
+    private void handleSchemaMessage(IMessage message, String nodeId) {
+        SchemaMessage schemaMessage = (SchemaMessage) message;
+        SchemaDircetoryService.INSTANCE.populateSchema(schemaMessage);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
index 73dd706..0d8aa80 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/result/ResultUtils.java
@@ -33,9 +33,13 @@
 
 import org.apache.asterix.api.common.SessionConfig;
 import org.apache.asterix.api.common.SessionConfig.OutputFormat;
+import org.apache.asterix.api.common.SessionConfig.SchemaType;
 import org.apache.asterix.api.http.servlet.APIServlet;
 import org.apache.asterix.common.utils.JSONUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.asterix.schema.service.SchemaDircetoryService;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
 import org.apache.http.ParseException;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.comm.IFrame;
@@ -185,6 +189,25 @@
         }
     }
 
+    public static void printSchema(SessionConfig conf) throws Exception {
+        if (SchemaDircetoryService.INSTANCE.prepareSchemas(conf.getJobId())) {
+            conf.out().println("<h4>Schema:</h4>");
+            conf.out().println("<PRE>");
+            ISchemaNode schema = ResultSchemaUtils
+                    .mergeSchemas(SchemaDircetoryService.INSTANCE.getSchemas(conf.getJobId()));
+            if (conf.getSchemaFormat() == SchemaType.ADM) {
+                conf.out().println(schema.toADM());
+            } else if (conf.getSchemaFormat() == SchemaType.DUMMY_JSON) {
+                JSONArray tuple = new JSONArray();
+                schema.toDummyJSON(tuple);
+                conf.out().println(tuple.toString(2));
+            } else {
+                schema.printPretty(conf.out());
+            }
+            conf.out().println("</PRE>");
+        }
+    }
+
     public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary,
             String errorStackTrace) {
         JSONObject errorResp = new JSONObject();
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index 1d693f9..34bc3b2 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -56,7 +56,7 @@
   </property>
   <property>
     <name>compiler.sortmemory</name>
-    <value>327680</value>
+    <value>134217728</value>
   </property>
   <property>
     <name>compiler.groupmemory</name>
diff --git a/asterixdb/asterix-app/src/main/resources/webui/querytemplate.html b/asterixdb/asterix-app/src/main/resources/webui/querytemplate.html
index 9115b89..e4c136c 100644
--- a/asterixdb/asterix-app/src/main/resources/webui/querytemplate.html
+++ b/asterixdb/asterix-app/src/main/resources/webui/querytemplate.html
@@ -38,6 +38,48 @@
 
 <script type="text/javascript">
 $(document).ready(function() {
+	var inferencers = null;
+	if($('#schema-inferencer-option').val() == null || 
+			$('#schema-inferencer-option').val() == "NO_SCHEMA") {
+		$('#schema-inferencer').hide();
+		$('#schema-format').hide();
+	}
+	
+	$('#schema-inferencer-option').on('change', function(){
+		console.log($(this).val());
+		if($(this).val() != "NO_SCHEMA")
+		{
+			$('#schema-format').show();
+		}
+		else
+		{
+			$('#schema-format').hide();
+		}
+	});
+	
+	$('#output-format-option').on('change', function(){		
+		if($(this).val() == "CLEAN_JSON")
+		{
+			if(inferencers  == null) {
+				$.get("/schema/inferencers", function(data){
+				    inferencers = JSON.parse(data);
+				    for(i in inferencers){
+				    	$('#schema-inferencer-option').append('<option value="' + inferencers[i] +'">' + inferencers[i] + '</option>');
+				    }
+				});
+			}
+	      
+			$('#schema-inferencer').show();
+			if($('#schema-inferencer-option').val() != "NO_SCHEMA") {
+				$('#schema-format').show();
+			}
+		}
+		else
+		{
+			$('#schema-inferencer').hide();
+			$('#schema-format').hide();
+		}
+	});
 
     var optionButtonSize = $('#checkboxes-on').width();
     $('#clear-query-button, #run-btn').width(optionButtonSize);
@@ -227,7 +269,7 @@
                 </select>
               </label>
               <label id="output-format" class="optlabel"> Output Format:<br/>
-                <select name="output-format" class="btn">
+                <select id="output-format-option" name="output-format" class="btn">
                   <option selected value="ADM">ADM</option>
                   <option value="CSV">CSV (no header)</option>
                   <option value="CSV-Header">CSV (with header)</option>
@@ -235,6 +277,18 @@
                   <option value="LOSSLESS_JSON">JSON (lossless)</option>
                 </select>
               </label>
+              <label id="schema-inferencer" class="optlabel"> Schema Inferencer:<br/>
+               <select id="schema-inferencer-option" name="schema-inferencer" class="btn">
+                 <option selected value="NO_SCHEMA">NO SCHEMA</option>
+               </select>
+          	</label>
+          	<label id="schema-format" class="optlabel"> Schema Format:<br/>
+              <select id="schema-format-option" name="schema-format" class="btn">
+                <option selected value="PRETTY">Pretty Print</option>
+                <option value="ADM">ADM</option>
+                <option value="DUMMY_JSON">Dummy JSON</option>
+              </select>
+          	</label>
               <label class="optlabel"><input type="checkbox" name="wrapper-array" value="true" /> Wrap results in outer array</label>
               <label class="checkbox optlabel"><input type="checkbox" name="print-expr-tree" value="true" /> Print parsed expressions</label>
               <label class="checkbox optlabel"><input type="checkbox" name="print-rewritten-expr-tree" value="true" /> Print rewritten expressions</label>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index fba74e8..34cb9b7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -36,7 +36,9 @@
         COMPLETE_FAILBACK_REQUEST,
         COMPLETE_FAILBACK_RESPONSE,
         REPLICA_EVENT,
-        FEED_PROVIDER_READY
+        FEED_PROVIDER_READY,
+        SCHEMA_REGISTER_REQUEST,
+        SCHEMA_MESSAGE
     }
 
     public abstract ApplicationMessageType getMessageType();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
index b75d16c..fafd544 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
@@ -31,6 +31,8 @@
         SQLPP_DDL("/ddl/sqlpp"),
         QUERY_STATUS("/query/status"),
         QUERY_RESULT("/query/result"),
+        QUERY_RESULT_LOCATION("/query/result/location"),
+        QUERY_RESULT_SCHEMA("/query/result/schema"),
         QUERY_SERVICE("/query/service"),
         CONNECTOR("/connector"),
         SHUTDOWN("/admin/shutdown"),
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index fcd3078..65c2ea8 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -52,6 +52,18 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-schema</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-schema-spark</artifactId>
+      <version>0.8.9-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AFlatPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AFlatPrinterFactory.java
new file mode 100644
index 0000000..4ae1220
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AFlatPrinterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.hyracks.algebricks.data.IPrinter;
+
+public class AFlatPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+    private IAType flatType;
+
+    public AFlatPrinterFactory(IAType flatType) {
+        this.flatType = flatType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+        schemaBuilder = new SchemaBuilder(typeComputer, schemaId);
+        schemaBuilder.setTuple();
+        schemaBuilder.setSchema(flatType.getTypeTag());
+        return AObjectPrinterFactory.getFlatPrinter(flatType.getTypeTag());
+    }
+
+    public void setTypeTag(IAType flatType) {
+        this.flatType = flatType;
+    }
+
+    public IHeterogeneousTypeComputer getTypeComputer() {
+        return typeComputer;
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AObjectPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AObjectPrinterFactory.java
new file mode 100644
index 0000000..80b1919
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AObjectPrinterFactory.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ABinaryHexPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ABooleanPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ACirclePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ADatePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ADateTimePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ADayTimeDurationPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ADoublePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ADurationPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AFloatPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AInt16PrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AInt32PrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AInt64PrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AInt8PrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AIntervalPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ALinePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ANullPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.APoint3DPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.APointPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.APolygonPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ARectanglePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AStringPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.ATimePrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AUUIDPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AYearMonthDurationPrinterFactory;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AObjectPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public static boolean printFlatValue(ATypeTag typeTag, byte[] b, int s, int l, PrintStream ps)
+            throws HyracksDataException {
+        return org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AObjectPrinterFactory
+                .printFlatValue(typeTag, b, s, l, ps);
+    }
+
+    public static IPrinter getFlatPrinter(ATypeTag typeTag) {
+        switch (typeTag) {
+            case INT8:
+                return AInt8PrinterFactory.PRINTER;
+            case INT16:
+                return AInt16PrinterFactory.PRINTER;
+            case INT32:
+                return AInt32PrinterFactory.PRINTER;
+            case INT64:
+                return AInt64PrinterFactory.PRINTER;
+            case MISSING:
+            case NULL:
+                return ANullPrinterFactory.PRINTER;
+            case BOOLEAN:
+                return ABooleanPrinterFactory.PRINTER;
+            case FLOAT:
+                return AFloatPrinterFactory.PRINTER;
+            case DOUBLE:
+                return ADoublePrinterFactory.PRINTER;
+            case DATE:
+                return ADatePrinterFactory.PRINTER;
+            case TIME:
+                return ATimePrinterFactory.PRINTER;
+            case DATETIME:
+                return ADateTimePrinterFactory.PRINTER;
+            case DURATION:
+                return ADurationPrinterFactory.PRINTER;
+            case YEARMONTHDURATION:
+                return AYearMonthDurationPrinterFactory.PRINTER;
+            case DAYTIMEDURATION:
+                return ADayTimeDurationPrinterFactory.PRINTER;
+            case INTERVAL:
+                return AIntervalPrinterFactory.PRINTER;
+            case POINT:
+                return APointPrinterFactory.PRINTER;
+            case POINT3D:
+                return APoint3DPrinterFactory.PRINTER;
+            case LINE:
+                return ALinePrinterFactory.PRINTER;
+            case POLYGON:
+                return APolygonPrinterFactory.PRINTER;
+            case CIRCLE:
+                return ACirclePrinterFactory.PRINTER;
+            case RECTANGLE:
+                return ARectanglePrinterFactory.PRINTER;
+            case STRING:
+                return AStringPrinterFactory.PRINTER;
+            case BINARY:
+                return ABinaryHexPrinterFactory.PRINTER;
+            case UUID:
+                return AUUIDPrinterFactory.PRINTER;
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+        return new IPrinter() {
+
+            private IPrinter recordPrinter;
+            private IPrinter orderedlistPrinter;
+            private IPrinter unorderedListPrinter;
+
+            @Override
+            public void init() throws HyracksDataException {
+                ARecordPrinterFactory recordFactory = new ARecordPrinterFactory(null);
+                recordFactory.setTypeComputer(typeComputer);
+                recordFactory.setSchemaId(schemaId);
+                AOrderedlistPrinterFactory orderListFactory = new AOrderedlistPrinterFactory(null);
+                orderListFactory.setTypeComputer(typeComputer);
+                orderListFactory.setSchemaId(schemaId);
+                AUnorderedlistPrinterFactory unorderedListFactory = new AUnorderedlistPrinterFactory(null);
+                unorderedListFactory.setTypeComputer(typeComputer);
+                unorderedListFactory.setSchemaId(schemaId);
+                recordPrinter = recordFactory.createPrinter();
+                orderedlistPrinter = orderListFactory.createPrinter();
+                unorderedListPrinter = unorderedListFactory.createPrinter();
+                schemaBuilder = new SchemaBuilder(typeComputer, schemaId);
+            }
+
+            @Override
+            public void print(byte[] b, int s, int l, PrintStream ps) throws HyracksDataException {
+                ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
+                switch (typeTag) {
+                    case INT8:
+                    case INT16:
+                    case INT32:
+                    case INT64:
+                    case NULL:
+                    case BOOLEAN:
+                    case FLOAT:
+                    case DOUBLE:
+                    case TIME:
+                    case DATE:
+                    case DATETIME:
+                    case DURATION:
+                    case YEARMONTHDURATION:
+                    case DAYTIMEDURATION:
+                    case INTERVAL:
+                    case POINT:
+                    case POINT3D:
+                    case LINE:
+                    case POLYGON:
+                    case CIRCLE:
+                    case RECTANGLE:
+                    case STRING:
+                    case BINARY:
+                    case UUID:
+                        if (!schemaBuilder.isInitialized())
+                            schemaBuilder.setTuple();
+                        schemaBuilder.setSchema(typeTag);
+                        AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+                        break;
+                    case RECORD: {
+                        this.recordPrinter.init();
+                        recordPrinter.print(b, s, l, ps);
+                        break;
+                    }
+                    case ORDEREDLIST: {
+                        this.orderedlistPrinter.init();
+                        orderedlistPrinter.print(b, s, l, ps);
+                        break;
+                    }
+                    case UNORDEREDLIST: {
+                        this.unorderedListPrinter.init();
+                        unorderedListPrinter.print(b, s, l, ps);
+                        break;
+                    }
+
+                    case ANY:
+                    case BITARRAY:
+                    case ENUM:
+                    case SHORTWITHOUTTYPEINFO:
+                    case SPARSERECORD:
+                    case SYSTEM_NULL:
+                    case TYPE:
+                    case UINT16:
+                    case UINT32:
+                    case UINT64:
+                    case UINT8:
+                    case UNION:
+                        throw new NotImplementedException("No printer for type " + typeTag);
+                }
+
+            }
+        };
+    }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOptionalFieldPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOptionalFieldPrinterFactory.java
new file mode 100644
index 0000000..5d0a33a
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOptionalFieldPrinterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.formats.nontagged.AqlCleanJSONWithSchemaPrinterFactoryProvider;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AOptionalFieldPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private AUnionType unionType;
+
+    public AOptionalFieldPrinterFactory(AUnionType unionType) {
+        this.unionType = unionType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+        return new IPrinter() {
+            private IPrinter nullPrinter;
+            private IPrinter fieldPrinter;
+
+            @Override
+            public void init() throws HyracksDataException {
+                AbstractPrinterWithSchemaFactory npf = (AbstractPrinterWithSchemaFactory) AqlCleanJSONWithSchemaPrinterFactoryProvider
+                        .getPrinterFactory(BuiltinType.ANULL, schemaId, typeComputer);
+                nullPrinter = npf.createPrinter();
+
+                AbstractPrinterWithSchemaFactory fpf = (AbstractPrinterWithSchemaFactory) AqlCleanJSONWithSchemaPrinterFactoryProvider
+                        .getPrinterFactory(unionType.getActualType(), schemaId, typeComputer);
+
+                fieldPrinter = fpf.createPrinter();
+            }
+
+            @Override
+            public void print(byte[] b, int s, int l, PrintStream ps) throws HyracksDataException {
+                fieldPrinter.init();
+                if (b[s] == ATypeTag.NULL.serialize())
+                    nullPrinter.print(b, s, l, ps);
+                else
+                    fieldPrinter.print(b, s, l, ps);
+            }
+
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOrderedlistPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOrderedlistPrinterFactory.java
new file mode 100644
index 0000000..9132031
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AOrderedlistPrinterFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.printer.json.clean.schema.APrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AOrderedlistPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AOrderedListType orderedlistType;
+
+    public AOrderedlistPrinterFactory(AOrderedListType orderedlistType) {
+        this.orderedlistType = orderedlistType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+
+        PointableAllocator allocator = new PointableAllocator();
+        final IAType inputType = orderedlistType == null
+                ? DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.ORDEREDLIST) : orderedlistType;
+        final IVisitablePointable listAccessor = allocator.allocateListValue(inputType);
+        schemaBuilder = new SchemaBuilder(typeComputer, schemaId);
+        final APrintVisitor printVisitor = new APrintVisitor(schemaBuilder);
+        final Pair<PrintStream, ATypeTag> arg = new Pair<PrintStream, ATypeTag>(null, null);
+
+        return new IPrinter() {
+
+            @Override
+            public void init() throws HyracksDataException {
+                arg.second = inputType.getTypeTag();
+            }
+
+            @Override
+            public void print(byte[] b, int start, int l, PrintStream ps) throws HyracksDataException {
+                try {
+                    listAccessor.set(b, start, l);
+                    arg.first = ps;
+                    listAccessor.accept(printVisitor, arg);
+                } catch (Exception ioe) {
+                    throw new HyracksDataException(ioe);
+                }
+            }
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/ARecordPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/ARecordPrinterFactory.java
new file mode 100644
index 0000000..09ac0ae
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/ARecordPrinterFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.printer.json.clean.schema.APrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ARecordPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final ARecordType recType;
+
+    public ARecordPrinterFactory(ARecordType recType) {
+        this.recType = recType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+
+        PointableAllocator allocator = new PointableAllocator();
+        final IAType inputType = recType == null ? DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.RECORD)
+                : recType;
+        final IVisitablePointable recAccessor = allocator.allocateRecordValue(inputType);
+        schemaBuilder = new SchemaBuilder(typeComputer, schemaId);
+        final APrintVisitor printVisitor = new APrintVisitor(schemaBuilder);
+        final Pair<PrintStream, ATypeTag> arg = new Pair<PrintStream, ATypeTag>(null, null);
+        return new IPrinter() {
+
+            @Override
+            public void init() throws HyracksDataException {
+                arg.second = inputType.getTypeTag();
+            }
+
+            @Override
+            public void print(byte[] b, int start, int l, PrintStream ps) throws HyracksDataException {
+                try {
+                    recAccessor.set(b, start, l);
+                    arg.first = ps;
+                    recAccessor.accept(printVisitor, arg);
+                } catch (Exception ioe) {
+                    throw new HyracksDataException(ioe);
+                }
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnionPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnionPrinterFactory.java
new file mode 100644
index 0000000..7efc232
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnionPrinterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.asterix.formats.nontagged.AqlCleanJSONWithSchemaPrinterFactoryProvider;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AUnionPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private AUnionType unionType;
+
+    public AUnionPrinterFactory(AUnionType unionType) {
+        this.unionType = unionType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+
+        return new IPrinter() {
+
+            private IPrinter[] printers;
+            private List<IAType> unionList;
+
+            @Override
+            public void init() throws HyracksDataException {
+                unionList = unionType.getUnionList();
+                printers = new IPrinter[unionType.getUnionList().size()];
+                AbstractPrinterWithSchemaFactory printerFactory;
+                for (int i = 0; i < printers.length; i++) {
+                    printerFactory = (AbstractPrinterWithSchemaFactory) AqlCleanJSONWithSchemaPrinterFactoryProvider
+                            .getPrinterFactory(unionType.getUnionList().get(i), schemaId, typeComputer);
+                    printers[i] = printerFactory.createPrinter();
+                    printers[i].init();
+                }
+            }
+
+            @Override
+            public void print(byte[] b, int s, int l, PrintStream ps) throws HyracksDataException {
+                ATypeTag tag = unionList.get(b[s + 1]).getTypeTag();
+                if (tag == ATypeTag.UNION)
+                    printers[b[s + 1]].print(b, s + 1, l, ps);
+                else {
+                    if (tag == ATypeTag.ANY)
+                        printers[b[s + 1]].print(b, s + 2, l, ps);
+                    else
+                        printers[b[s + 1]].print(b, s + 1, l, ps);
+                }
+            }
+        };
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnorderedlistPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnorderedlistPrinterFactory.java
new file mode 100644
index 0000000..00153eb
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AUnorderedlistPrinterFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.printer.json.clean.schema.APrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AUnorderedlistPrinterFactory extends AbstractPrinterWithSchemaFactory {
+
+    private static final long serialVersionUID = 1L;
+    private AUnorderedListType unorderedlistType;
+
+    public AUnorderedlistPrinterFactory(AUnorderedListType unorderedlistType) {
+        this.unorderedlistType = unorderedlistType;
+    }
+
+    @Override
+    public IPrinter createPrinter() {
+
+        PointableAllocator allocator = new PointableAllocator();
+        final IAType inputType = unorderedlistType == null
+                ? DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.UNORDEREDLIST) : unorderedlistType;
+        final IVisitablePointable listAccessor = allocator.allocateListValue(inputType);
+        schemaBuilder = new SchemaBuilder(typeComputer, schemaId);
+        final APrintVisitor printVisitor = new APrintVisitor(schemaBuilder);
+        final Pair<PrintStream, ATypeTag> arg = new Pair<PrintStream, ATypeTag>(null, null);
+
+        return new IPrinter() {
+
+            @Override
+            public void init() throws HyracksDataException {
+                arg.second = inputType.getTypeTag();
+            }
+
+            @Override
+            public void print(byte[] b, int start, int l, PrintStream ps) throws HyracksDataException {
+                try {
+                    listAccessor.set(b, start, l);
+                    arg.first = ps;
+                    listAccessor.accept(printVisitor, arg);
+                } catch (Exception ioe) {
+                    throw new HyracksDataException(ioe);
+                }
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AbstractPrinterWithSchemaFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AbstractPrinterWithSchemaFactory.java
new file mode 100644
index 0000000..b4e780d
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/AbstractPrinterWithSchemaFactory.java
@@ -0,0 +1,35 @@
+/*
+* Copyright 2009-2013 by The Regents of the University of California
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* you may obtain a copy of the License from
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+
+public abstract class AbstractPrinterWithSchemaFactory implements IPrinterWithSchemaFactory {
+    private static final long serialVersionUID = 1L;
+
+    protected IHeterogeneousTypeComputer typeComputer;
+    protected long schemaId;
+    protected transient SchemaBuilder schemaBuilder;
+
+    public void setTypeComputer(IHeterogeneousTypeComputer typeComputer) {
+        this.typeComputer = typeComputer;
+    }
+
+    public void setSchemaId(long schemaId) {
+        this.schemaId = schemaId;
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/IPrinterWithSchemaFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/IPrinterWithSchemaFactory.java
new file mode 100644
index 0000000..82a62a0
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/schema/IPrinterWithSchemaFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema;
+
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public interface IPrinterWithSchemaFactory extends IPrinterFactory {
+
+    public void setTypeComputer(IHeterogeneousTypeComputer typeComputer);
+
+    public void setSchemaId(long schemaId);
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
index 3c31f00..1f7fa52 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/base/IDataFormat.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -68,6 +69,9 @@
 
     public IPrinterFactoryProvider getCleanJSONPrinterFactoryProvider();
 
+    public IPrinterFactoryProvider getCleanJSONWithSchemaPrinterFactoryProvider(long schemaId,
+            IHeterogeneousTypeComputer typeComputer);
+
     public IMissingWriterFactory getMissingWriterFactory();
 
     public Triple<IScalarEvaluatorFactory, ScalarFunctionCallExpression, IAType> partitioningEvaluatorFactory(
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlCleanJSONWithSchemaPrinterFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlCleanJSONWithSchemaPrinterFactoryProvider.java
new file mode 100644
index 0000000..754a7e4
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/AqlCleanJSONWithSchemaPrinterFactoryProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.formats.nontagged;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AFlatPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AObjectPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AOptionalFieldPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AOrderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.ARecordPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AUnionPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.AUnorderedlistPrinterFactory;
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.schema.IPrinterWithSchemaFactory;
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AqlCleanJSONWithSchemaPrinterFactoryProvider implements IPrinterFactoryProvider {
+
+    private long schemaId;
+    private IHeterogeneousTypeComputer typeComputer;
+
+    public AqlCleanJSONWithSchemaPrinterFactoryProvider(long schemaId, IHeterogeneousTypeComputer typeComputer) {
+        this.schemaId = schemaId;
+        this.typeComputer = typeComputer;
+    }
+
+    @Override
+    public IPrinterFactory getPrinterFactory(Object type) throws HyracksDataException {
+        return getPrinterFactory(type, schemaId, typeComputer);
+    }
+
+    public static IPrinterFactory getPrinterFactory(Object type, long schemaId, IHeterogeneousTypeComputer typeComputer)
+            throws HyracksDataException {
+        IAType aqlType = (IAType) type;
+        try {
+            if (aqlType != null) {
+                ATypeTag typeTag = aqlType.getTypeTag();
+                switch (typeTag) {
+                    case INT8:
+                    case INT16:
+                    case INT32:
+                    case INT64:
+                    case NULL:
+                    case BOOLEAN:
+                    case FLOAT:
+                    case DOUBLE:
+                    case TIME:
+                    case DATE:
+                    case DATETIME:
+                    case DURATION:
+                    case YEARMONTHDURATION:
+                    case DAYTIMEDURATION:
+                    case INTERVAL:
+                    case POINT:
+                    case POINT3D:
+                    case LINE:
+                    case POLYGON:
+                    case CIRCLE:
+                    case RECTANGLE:
+                    case STRING:
+                    case BINARY:
+                    case UUID:
+                    case SHORTWITHOUTTYPEINFO:
+                        AFlatPrinterFactory fpf = new AFlatPrinterFactory(aqlType);
+                        fpf.setSchemaId(schemaId);
+                        fpf.setTypeComputer(typeComputer);
+                        return fpf;
+                    case RECORD:
+                        ARecordPrinterFactory rpf = new ARecordPrinterFactory((ARecordType) aqlType);
+                        rpf.setSchemaId(schemaId);
+                        rpf.setTypeComputer(typeComputer);
+                        return rpf;
+                    case ORDEREDLIST:
+                        AOrderedlistPrinterFactory olf = new AOrderedlistPrinterFactory((AOrderedListType) aqlType);
+                        olf.setSchemaId(schemaId);
+                        olf.setTypeComputer(typeComputer);
+                        return olf;
+                    case UNORDEREDLIST:
+                        AUnorderedlistPrinterFactory unolf = new AUnorderedlistPrinterFactory(
+                                (AUnorderedListType) aqlType);
+                        unolf.setSchemaId(schemaId);
+                        unolf.setTypeComputer(typeComputer);
+                        return unolf;
+                    case UNION:
+                        IPrinterWithSchemaFactory printerFactory;
+                        if (((AUnionType) aqlType).isUnknownableType()) {
+                            printerFactory = new AOptionalFieldPrinterFactory((AUnionType) aqlType);
+                        } else {
+                            printerFactory = new AUnionPrinterFactory((AUnionType) aqlType);
+                        }
+                        printerFactory.setSchemaId(schemaId);
+                        printerFactory.setTypeComputer(typeComputer);
+                        return printerFactory;
+                    case ANY:
+                    case BITARRAY:
+                    case ENUM:
+                    case SPARSERECORD:
+                    case SYSTEM_NULL:
+                    case TYPE:
+                    case UINT16:
+                    case UINT32:
+                    case UINT64:
+                    case UINT8:
+                        // These types are not intended to be printed to the user.
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        AObjectPrinterFactory objectPrinterFactory = new AObjectPrinterFactory();
+        objectPrinterFactory.setSchemaId(schemaId);
+        objectPrinterFactory.setTypeComputer(typeComputer);
+        return objectPrinterFactory;
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AListPrinter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AListPrinter.java
index 704d06e..8f31384 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AListPrinter.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/AListPrinter.java
@@ -34,11 +34,11 @@
  * This class is to print the content of a list.
  */
 public class AListPrinter {
-    private final String startList;
-    private final String endList;
-    private final String separator;
+    protected final String startList;
+    protected final String endList;
+    protected final String separator;
 
-    private final Pair<PrintStream, ATypeTag> itemVisitorArg = new Pair<>(null, null);
+    protected final Pair<PrintStream, ATypeTag> itemVisitorArg = new Pair<>(null, null);
 
     public AListPrinter(String startList, String endList, String separator) {
         this.startList = startList;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
index 1a91d8b..3c0d88f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
@@ -34,13 +34,13 @@
  * This class is to print the content of a record.
  */
 public class ARecordPrinter {
-    private final String startRecord;
-    private final String endRecord;
-    private final String fieldSeparator;
-    private final String fieldNameSeparator;
+    protected final String startRecord;
+    protected final String endRecord;
+    protected final String fieldSeparator;
+    protected final String fieldNameSeparator;
 
-    private final Pair<PrintStream, ATypeTag> nameVisitorArg = new Pair<>(null, ATypeTag.STRING);
-    private final Pair<PrintStream, ATypeTag> itemVisitorArg = new Pair<>(null, null);
+    protected final Pair<PrintStream, ATypeTag> nameVisitorArg = new Pair<>(null, ATypeTag.STRING);
+    protected final Pair<PrintStream, ATypeTag> itemVisitorArg = new Pair<>(null, null);
 
     public ARecordPrinter(final String startRecord, final String endRecord, final String fieldSeparator,
             final String fieldNameSeparator) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/AListPrinterSchema.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/AListPrinterSchema.java
new file mode 100644
index 0000000..2df65a0
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/AListPrinterSchema.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.json.clean.schema;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.printer.AListPrinter;
+import org.apache.asterix.om.pointables.printer.IPrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.schema.node.AbstractNestedSchemaNode;
+
+/**
+ * This class is to print the content of a list with schema inferencing
+ */
+public class AListPrinterSchema extends AListPrinter {
+
+    public AListPrinterSchema(String startList, String endList, String separator) {
+        super(startList, endList, separator);
+    }
+
+    @Override
+    public void printList(AListVisitablePointable listAccessor, PrintStream ps, IPrintVisitor visitor)
+            throws IOException, AsterixException {
+        List<IVisitablePointable> itemTags = listAccessor.getItemTags();
+        List<IVisitablePointable> items = listAccessor.getItems();
+        itemVisitorArg.first = ps;
+
+        final APrintVisitor visitorSchema = (APrintVisitor) visitor;
+
+        SchemaBuilder builder = visitorSchema.getSchemaBuilder();
+
+        if (!builder.isInitialized()) {
+            builder.setTuple();
+        }
+
+        AbstractNestedSchemaNode parent = builder.enterNestingWithList();
+
+        ps.print(startList);
+
+        // print item 0 to n-2
+        final int size = items.size();
+        for (int i = 0; i < size - 1; i++) {
+            printItem(visitor, itemTags, items, i);
+            ps.print(separator);
+        }
+
+        // print item n-1
+        if (size > 0) {
+            printItem(visitor, itemTags, items, size - 1);
+        }
+
+        ps.print(endList);
+
+        builder.exitNesting(parent);
+        parent.finisedFirstRecord();
+    }
+
+    private void printItem(IPrintVisitor visitor, List<IVisitablePointable> itemTags, List<IVisitablePointable> items,
+            int i) throws AsterixException {
+        IVisitablePointable itemTypeTag = itemTags.get(i);
+        IVisitablePointable item = items.get(i);
+        ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                .deserialize(itemTypeTag.getByteArray()[itemTypeTag.getStartOffset()]);
+        itemVisitorArg.second = item.getLength() <= 1 ? ATypeTag.NULL : typeTag;
+        item.accept(visitor, itemVisitorArg);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/APrintVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/APrintVisitor.java
new file mode 100644
index 0000000..dbc46d3
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/APrintVisitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.json.clean.schema;
+
+import java.io.PrintStream;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AObjectPrinterFactory;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.printer.AListPrinter;
+import org.apache.asterix.om.pointables.printer.ARecordPrinter;
+import org.apache.asterix.om.pointables.printer.AbstractPrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This class is a IVisitablePointableVisitor implementation which recursively
+ * visit a given record, list or flat value of a given type, and print it to a
+ * PrintStream in Clean JSON format.
+ * It also infers the schema of the resulting output.
+ */
+public class APrintVisitor extends AbstractPrintVisitor {
+
+    private final SchemaBuilder schemaBuilder;
+
+    /**
+     * We need to attach SchemaBuilder to infer the schema while printing the output result.
+     * 
+     * @param schemaBuilder
+     */
+    public APrintVisitor(SchemaBuilder schemaBuilder) {
+        this.schemaBuilder = schemaBuilder;
+    }
+
+    @Override
+    protected AListPrinter createListPrinter(AListVisitablePointable accessor) {
+        return new AListPrinterSchema("[ ", " ]", ", ");
+    }
+
+    @Override
+    protected ARecordPrinter createRecordPrinter(ARecordVisitablePointable accessor) {
+        return new ARecordPrinterSchema("{ ", " }", ", ", ": ");
+    }
+
+    @Override
+    protected boolean printFlatValue(ATypeTag typeTag, byte[] b, int s, int l, PrintStream ps)
+            throws HyracksDataException {
+        if (schemaBuilder.isPrintFieldName())
+            schemaBuilder.setFieldName(b, s, l);
+        else
+            schemaBuilder.setSchema(typeTag);
+        return AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+    }
+
+    public SchemaBuilder getSchemaBuilder() {
+        return schemaBuilder;
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/ARecordPrinterSchema.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/ARecordPrinterSchema.java
new file mode 100644
index 0000000..0c0a109
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/json/clean/schema/ARecordPrinterSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.json.clean.schema;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.printer.ARecordPrinter;
+import org.apache.asterix.om.pointables.printer.IPrintVisitor;
+import org.apache.asterix.om.schema.builder.SchemaBuilder;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.schema.node.AbstractNestedSchemaNode;
+
+/**
+ * This class is to print the content of a record with schema inferencing.
+ */
+public class ARecordPrinterSchema extends ARecordPrinter {
+
+    /**
+     * Constructor.
+     * 
+     * @param startRecord
+     * @param endRecord
+     * @param fieldSeparator
+     * @param fieldNameSeparator
+     */
+    public ARecordPrinterSchema(final String startRecord, final String endRecord, final String fieldSeparator,
+            final String fieldNameSeparator) {
+        super(startRecord, endRecord, fieldSeparator, fieldNameSeparator);
+    }
+
+    @Override
+    public void printRecord(ARecordVisitablePointable recordAccessor, PrintStream ps, IPrintVisitor visitor)
+            throws IOException, AsterixException {
+
+        List<IVisitablePointable> fieldNames = recordAccessor.getFieldNames();
+        List<IVisitablePointable> fieldValues = recordAccessor.getFieldValues();
+
+        nameVisitorArg.first = ps;
+        itemVisitorArg.first = ps;
+
+        ps.print(startRecord);
+
+        //Get the schema builder
+        final APrintVisitor visitorSchema = (APrintVisitor) visitor;
+        final SchemaBuilder builder = visitorSchema.getSchemaBuilder();
+        if (!builder.isInitialized()) {
+            //Set root as record
+            builder.setTuple();
+        }
+
+        final int size = fieldNames.size();
+        IVisitablePointable fieldValue = size > 0 ? fieldValues.get(0) : null;
+        ATypeTag typeTag = fieldValue != null ? EnumDeserializer.ATYPETAGDESERIALIZER
+                .deserialize(fieldValue.getByteArray()[fieldValue.getStartOffset()]) : null;
+        for (int i = 0; i < size; ++i) {
+            IVisitablePointable fieldName = fieldNames.get(i);
+
+            // Prints the current field.
+            if (typeTag != ATypeTag.MISSING) {
+                printField(ps, visitor, fieldName, fieldValue, typeTag);
+            }
+
+            // Prints the field separator.
+            if (i < size - 1) {
+                fieldValue = fieldValues.get(i + 1);
+                ATypeTag nextTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                        .deserialize(fieldValue.getByteArray()[fieldValue.getStartOffset()]);
+                if (!(i == 0 && typeTag == ATypeTag.MISSING) && nextTypeTag != ATypeTag.MISSING) {
+                    ps.print(fieldSeparator);
+                }
+                typeTag = nextTypeTag;
+            }
+        }
+        ps.print(endRecord);
+
+        builder.getCurrent().finisedFirstRecord();
+    }
+
+    private void printField(PrintStream ps, IPrintVisitor visitor, IVisitablePointable fieldName,
+            IVisitablePointable fieldValue, ATypeTag fieldTypeTag) throws AsterixException {
+        final APrintVisitor visitorSchema = (APrintVisitor) visitor;
+        final SchemaBuilder builder = visitorSchema.getSchemaBuilder();
+
+        itemVisitorArg.second = fieldTypeTag;
+        if (fieldNameSeparator != null) {
+            // print field name
+
+            //tell the builder to interpret the fieldName
+            builder.setPrintFieldName();
+            fieldName.accept(visitor, nameVisitorArg);
+            builder.unsetPrintFieldName();
+            ps.print(fieldNameSeparator);
+        }
+        // print field value
+        AbstractNestedSchemaNode parent = builder.enterNestingWithRecord();
+        fieldValue.accept(visitor, itemVisitorArg);
+        builder.exitNesting(parent);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/HeterogeneousTypeComputerLoader.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/HeterogeneousTypeComputerLoader.java
new file mode 100644
index 0000000..2474db3
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/HeterogeneousTypeComputerLoader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.om.schema.builder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+/**
+ * Class loader for {@link org.apache.asterix.schema.node.IHeterogeneousTypeComputer}.
+ * 
+ * @author wail
+ */
+public class HeterogeneousTypeComputerLoader {
+    public static final HeterogeneousTypeComputerLoader INSTANCE = new HeterogeneousTypeComputerLoader();
+
+    private Map<String, String> nameClassNameMap;
+    private HashMap<String, IHeterogeneousTypeComputer> loadedTypeComputer;
+
+    private HeterogeneousTypeComputerLoader() {
+        try {
+            nameClassNameMap = new HashMap<>();
+            loadedTypeComputer = new HashMap<>();
+            Properties prop = new Properties();
+            InputStream in = getClass().getResourceAsStream("/schema-inferencers.properties");
+            prop.load(in);
+            in.close();
+            for (String name : prop.stringPropertyNames()) {
+                nameClassNameMap.put(name, prop.getProperty(name));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public IHeterogeneousTypeComputer getTypeComputer(String typeComputerName) throws AlgebricksException {
+
+        IHeterogeneousTypeComputer typeComputer = loadedTypeComputer.get(typeComputerName);
+
+        if (typeComputer == null) {
+
+            try {
+                String className = nameClassNameMap.get(typeComputerName);
+                if (className == null)
+                    className = typeComputerName;
+                Class<?> typeComputerClass = Class.forName(className);
+                typeComputer = (IHeterogeneousTypeComputer) typeComputerClass.newInstance();
+
+                synchronized (loadedTypeComputer) {
+                    loadedTypeComputer.put(typeComputerName, typeComputer);
+                }
+
+            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+                throw new AlgebricksException(e);
+            }
+        }
+
+        return typeComputer;
+    }
+
+    public Set<String> getNameClassNameMap() {
+        return nameClassNameMap.keySet();
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/SchemaBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/SchemaBuilder.java
new file mode 100644
index 0000000..e702257
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/schema/builder/SchemaBuilder.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.om.schema.builder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.AObjectPrinterFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.schema.node.AbstractNestedSchemaNode;
+import org.apache.asterix.schema.node.FlatSchemaNode;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.asterix.schema.node.ListSchemaNode;
+import org.apache.asterix.schema.node.RecordSchemaNode;
+import org.apache.asterix.schema.node.pool.NodesObjectPool;
+import org.apache.asterix.schema.service.SchemaReporterService;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The purpose of this class is to infer the output schema using the rules of an implementation
+ * of {@link org.apache.asterix.schema.node}
+ * 
+ * @author Wail Alkowaileet
+ */
+
+public class SchemaBuilder {
+
+    private final long schemaId;
+    private AbstractNestedSchemaNode tuple;
+    private final NodesObjectPool pool;
+
+    private AbstractNestedSchemaNode current;
+    private boolean printFieldName;
+    private final ByteArrayOutputStream fieldNameByteStream;
+    private PrintStream fieldNamePrintStream;
+    private String currentFieldName;
+
+    /**
+     * Create SchemaBuilder for a specific Schema ID (Job)
+     * 
+     * @param typeComputer
+     * @param schemaId
+     */
+    public SchemaBuilder(IHeterogeneousTypeComputer typeComputer, long schemaId) {
+        pool = new NodesObjectPool(typeComputer);
+        current = tuple = null;
+        printFieldName = false;
+        fieldNameByteStream = new ByteArrayOutputStream();
+        fieldNamePrintStream = new PrintStream(fieldNameByteStream);
+        this.schemaId = schemaId;
+    }
+
+    /**
+     * Exit the scope of the complex type.
+     * 
+     * @param parent
+     */
+    public void exitNesting(AbstractNestedSchemaNode parent) {
+        if (parent != current) {
+
+            if (!current.isSeen() && current.isInnerTypeSet())
+                parent.addItem(current);
+
+            current = parent;
+
+        }
+    }
+
+    public ISchemaNode getTuple() {
+        return tuple;
+    }
+
+    public AbstractNestedSchemaNode getCurrent() {
+        return current;
+    }
+
+    private void reportSchema() {
+        SchemaReporterService.INSTANCE.registerSchema(schemaId, tuple);
+    }
+
+    public boolean isInitialized() {
+        return tuple != null;
+    }
+
+    /**
+     * Set the root of the schema.
+     * 
+     * @return
+     */
+    public ISchemaNode setTuple() {
+        tuple = current = pool.getListNode();
+        tuple.setSeen(true);
+        tuple.setTuple();
+        reportSchema();
+        return tuple;
+    }
+
+    /**
+     * Enter a new scope as type list.
+     * 
+     * @return
+     */
+    public AbstractNestedSchemaNode enterNestingWithList() {
+
+        if (current.isStopInfer())
+            return current;
+        AbstractNestedSchemaNode parent = current;
+        current = current.getInnerListNode();
+        if (current == null) {
+            current = pool.getListNode();
+            current.setSeen(false);
+            if (!parent.isFirstPass())
+                current.setAsNullabe();
+        } else {
+            current.setSeen(true);
+        }
+        return parent;
+    }
+
+    /**
+     * Enter a new scope as type record.
+     * 
+     * @return
+     */
+    public AbstractNestedSchemaNode enterNestingWithRecord() {
+        if (current.isStopInfer())
+            return current;
+        AbstractNestedSchemaNode parent = current;
+
+        current = current.getInnerField(currentFieldName);
+
+        if (current == null) {
+            current = pool.getRecrodNode();
+            ((RecordSchemaNode) current).setFieldName(currentFieldName);
+            current.setSeen(false);
+
+            if (!parent.isFirstPass())
+                current.setAsNullabe();
+
+        } else {
+            current.setSeen(true);
+        }
+        currentFieldName = "";
+        return parent;
+    }
+
+    /**
+     * Set flag to capture the field name from the string printer.
+     */
+    public void setPrintFieldName() {
+        printFieldName = true;
+    }
+
+    /**
+     * Unset the flag to resume normal path.
+     */
+    public void unsetPrintFieldName() {
+        printFieldName = false;
+    }
+
+    public boolean isPrintFieldName() {
+        return printFieldName;
+    }
+
+    /**
+     * Capture the name of the field.
+     * This method get called only if printFieldName is set.
+     * 
+     * @param bs
+     * @param s
+     * @param l
+     * @throws HyracksDataException
+     */
+    public void setFieldName(byte[] bs, int s, int l) throws HyracksDataException {
+        fieldNameByteStream.reset();
+        AObjectPrinterFactory.printFlatValue(ATypeTag.STRING, bs, s, l, fieldNamePrintStream);
+        currentFieldName = fieldNameByteStream.toString().replaceAll("\"", "");
+    }
+
+    private ListSchemaNode getPointNode() {
+        ListSchemaNode pointNode;
+        pointNode = pool.getListNode();
+        pointNode.addItem(FlatSchemaNode.DOUBLE_NODE);
+        return pointNode;
+    }
+
+    private ListSchemaNode getPolygonNode() {
+        ListSchemaNode lineNode;
+        lineNode = pool.getListNode();
+        lineNode.addItem(getPointNode());
+        return lineNode;
+    }
+
+    private ListSchemaNode getCircleNode() {
+        ListSchemaNode circleNode;
+        circleNode = pool.getListNode();
+        circleNode.addItem(getPointNode());
+        circleNode.addItem(FlatSchemaNode.DOUBLE_NODE);
+        return circleNode;
+    }
+
+    private RecordSchemaNode getYearMonthDuration() {
+        RecordSchemaNode yearMontDurationNode;
+        yearMontDurationNode = pool.getRecrodNode();
+        yearMontDurationNode.setFieldName("year-month-duration");
+        yearMontDurationNode.addItem(FlatSchemaNode.LONG_NODE);
+        return yearMontDurationNode;
+    }
+
+    /**
+     * Set the type of the current node.
+     * 
+     * @param typeTag
+     */
+    public void setSchema(ATypeTag typeTag) {
+        if (current.isStopInfer())
+            return;
+        switch (typeTag) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case SHORTWITHOUTTYPEINFO:
+                current.addItem(FlatSchemaNode.LONG_NODE);
+                break;
+            case NULL:
+                current.addItem(FlatSchemaNode.NULL_NODE);
+                break;
+            case BOOLEAN:
+                current.addItem(FlatSchemaNode.BOOLEAN_NODE);
+                break;
+            case FLOAT:
+            case DOUBLE:
+                current.addItem(FlatSchemaNode.DOUBLE_NODE);
+                break;
+            case DATE:
+            case TIME:
+            case DATETIME:
+            case DURATION:
+            case BINARY:
+            case DAYTIMEDURATION:
+            case UUID:
+                current.addItem(FlatSchemaNode.STRING_NODE);
+                break;
+            case STRING: {
+                if (!isPrintFieldName())
+                    current.addItem(FlatSchemaNode.STRING_NODE);
+                break;
+            }
+            case POINT:
+            case POINT3D: {
+                current.addItem(getPointNode());
+                break;
+            }
+            case LINE: // Line is a polygon
+            case POLYGON:
+            case RECTANGLE: {
+                current.addItem(getPolygonNode());
+                break;
+            }
+            case CIRCLE:
+                current.addItem(getCircleNode());
+                break;
+            case YEARMONTHDURATION: {
+                current.addItem(getYearMonthDuration());
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
index 64735ff..595530d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.formats.nontagged.AqlBinaryIntegerInspector;
 import org.apache.asterix.formats.nontagged.AqlCSVPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlCleanJSONPrinterFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlCleanJSONWithSchemaPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlLosslessJSONPrinterFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlNormalizedKeyComputerFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlPredicateEvaluatorFactoryProvider;
@@ -74,6 +75,7 @@
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessByIndexEvalFactory;
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessByNameDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessNestedEvalFactory;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -632,6 +634,13 @@
         return AqlCSVPrinterFactoryProvider.INSTANCE;
     }
 
+    @Override
+    public IPrinterFactoryProvider getCleanJSONWithSchemaPrinterFactoryProvider(long schemaId,
+            IHeterogeneousTypeComputer typeComputer) {
+
+        return new AqlCleanJSONWithSchemaPrinterFactoryProvider(schemaId, typeComputer);
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public IScalarEvaluatorFactory getConstantEvalFactory(IAlgebricksConstantValue value) throws AlgebricksException {
diff --git a/asterixdb/asterix-schema-spark/pom.xml b/asterixdb/asterix-schema-spark/pom.xml
new file mode 100644
index 0000000..18f89a8
--- /dev/null
+++ b/asterixdb/asterix-schema-spark/pom.xml
@@ -0,0 +1,51 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>apache-asterixdb</artifactId>
+		<groupId>org.apache.asterix</groupId>
+		<version>0.8.9-SNAPSHOT</version>
+	</parent>
+	<artifactId>asterix-schema-spark</artifactId>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.asterix</groupId>
+			<artifactId>asterix-common</artifactId>
+			<version>0.8.9-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.asterix</groupId>
+			<artifactId>asterix-schema</artifactId>
+			<version>0.8.9-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/asterixdb/asterix-schema-spark/src/main/java/org/apache/asterix/schema/spark/SparkHeterogeneousTypeComputer.java b/asterixdb/asterix-schema-spark/src/main/java/org/apache/asterix/schema/spark/SparkHeterogeneousTypeComputer.java
new file mode 100644
index 0000000..9290c46
--- /dev/null
+++ b/asterixdb/asterix-schema-spark/src/main/java/org/apache/asterix/schema/spark/SparkHeterogeneousTypeComputer.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.spark;
+
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.asterix.schema.node.ISchemaNodeType;
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+
+/**
+ * Spark heterogeneous type computer.
+ * When the same field have more than one type, Spark casts it to String (except for numeric types).
+ * 
+ * @author wail
+ */
+public class SparkHeterogeneousTypeComputer implements IHeterogeneousTypeComputer {
+    private static final long serialVersionUID = 1L;
+
+    private boolean isNumeric(ISchemaNodeType type) {
+        return type == FlatSchemaNodeType.INT64 || type == FlatSchemaNodeType.DOUBLE;
+    }
+
+    @Override
+    public ISchemaNodeType computePrimitiveType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        //NULL is generic and is of all types
+        if (type1 == FlatSchemaNodeType.NULL)
+            return type2;
+
+        if (type2 == FlatSchemaNodeType.NULL)
+            return type1;
+
+        //String type is the largest ... anything is a string
+        if (type1 == FlatSchemaNodeType.STRING || type2 == FlatSchemaNodeType.STRING)
+            return FlatSchemaNodeType.STRING;
+
+        /*if they're both numeric, that means one is long
+        * and the other is double.
+        * So return Double
+        */
+        if (isNumeric(type1) && isNumeric(type2)) {
+            return FlatSchemaNodeType.DOUBLE;
+        }
+
+        // numeric and boolean --> string
+        return FlatSchemaNodeType.STRING;
+    }
+
+    /**
+     * Spark Schema inferencer infers heterogeneous types as strings
+     */
+    @Override
+    public ISchemaNodeType computeNestedType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        return FlatSchemaNodeType.STRING;
+    }
+
+    @Override
+    public ISchemaNodeType computePrimitiveWithNestedType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        if (type1 == FlatSchemaNodeType.NULL)
+            return type2.getNullableType();
+        if (type2 == FlatSchemaNodeType.NULL)
+            return type1.getNullableType();
+        return FlatSchemaNodeType.STRING;
+    }
+
+    @Override
+    public boolean stopInferring() {
+        return true;
+    }
+
+    @Override
+    public String getName() {
+        return "Spark";
+    }
+
+}
diff --git a/asterixdb/asterix-schema/pom.xml b/asterixdb/asterix-schema/pom.xml
new file mode 100644
index 0000000..9cbe1b1
--- /dev/null
+++ b/asterixdb/asterix-schema/pom.xml
@@ -0,0 +1,45 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>apache-asterixdb</artifactId>
+		<groupId>org.apache.asterix</groupId>
+		<version>0.8.9-SNAPSHOT</version>
+	</parent>
+	<artifactId>asterix-schema</artifactId>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.asterix</groupId>
+			<artifactId>asterix-common</artifactId>
+			<version>0.8.9-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+</project>
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaMessage.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaMessage.java
new file mode 100644
index 0000000..dcfbf34
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.schema.node.ISchemaNode;
+
+/**
+ * A message that holds the schema information.
+ * Sender: NC.
+ */
+public class SchemaMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    private final long schemaId;
+    private final ISchemaNode schema;
+
+    /**
+     * Create a schema message for a specific schema id.
+     * 
+     * @param schemaId
+     * @param schema
+     */
+    public SchemaMessage(long schemaId, ISchemaNode schema) {
+        this.schemaId = schemaId;
+        this.schema = schema;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.SCHEMA_MESSAGE;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    public ISchemaNode getSchema() {
+        return schema;
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaRegisterMessage.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaRegisterMessage.java
new file mode 100644
index 0000000..f0cb349
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/message/SchemaRegisterMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+
+/**
+ * Schema Register Message.
+ * Sender: both CC and NC.
+ */
+public class SchemaRegisterMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+
+    private final long schemaId;
+    private ApplicationMessageType messageType;
+
+    /**
+     * Create Schema Register Message (by NC).
+     * 
+     * @param schemaId
+     * @param messageType
+     */
+    public SchemaRegisterMessage(long schemaId, ApplicationMessageType messageType) {
+        this.schemaId = schemaId;
+        this.messageType = messageType;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    public void setMessageType(ApplicationMessageType messageType) {
+        this.messageType = messageType;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return messageType;
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractNestedSchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractNestedSchemaNode.java
new file mode 100644
index 0000000..8daeee5
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractNestedSchemaNode.java
@@ -0,0 +1,617 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.TreeSet;
+
+import org.apache.asterix.schema.node.pool.NodesObjectPool;
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+import org.apache.asterix.schema.node.type.NestedSchemaNodeType;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * The main class that handles complex types (arrays and records).
+ */
+public abstract class AbstractNestedSchemaNode extends AbstractSchemaNode {
+    private static final long serialVersionUID = 1L;
+
+    protected ISchemaNodeType innerType;
+    protected FlatSchemaNode flatNode;
+    protected HashMap<String, Integer> innerFieldsMap;
+    protected LinkedList<RecordSchemaNode> innerFields;
+    protected ListSchemaNode innerListNode;
+    protected TreeSet<Integer> noneNullableFields;
+
+    //Non-serializable members.
+
+    protected transient NodesObjectPool nodePool;
+    //For printADM purposes
+    private transient HashMap<String, MutableInt> fieldNameCountMap;
+    private transient StringBuilder rootAdmString;
+    private transient boolean addCreateStmt;
+
+    private transient boolean stopInfer = false;
+
+    //For SchemaBuilder purposes
+    private transient boolean seen;
+    private transient boolean inFirstRecord = true;
+
+    //For Asterix Inferencer ONLY
+    private transient boolean tuple = false;
+
+    /**
+     * To add elements to to a List and Record
+     * 
+     * @param itemNode
+     */
+    public void addItem(ISchemaNode itemNode) {
+        boolean nullable;
+
+        //Asterix doesn't support List of optional items
+        if (typeComputer.getName().equals("Asterix") && !tuple) {
+            if (type.equals(NestedSchemaNodeType.LIST) && itemNode.getType() == FlatSchemaNodeType.NULL) {
+                recycleInners();
+                flatNode = FlatSchemaNode.ANY_NODE;
+                innerType = FlatSchemaNodeType.ANY;
+                //Stop inferring type.
+                stopInfer = true;
+                return;
+            }
+        }
+
+        //Make the innerType of the first element type
+        if (innerType == null) {
+            innerType = itemNode.getType();
+        } else if (checkHetro(innerType, itemNode.getType()) && computeHeterogenousType(itemNode)) {
+            /**
+             * if computeHeterogeneousType() returns @false we will proceed to add the element
+             * after changing @this.innerType to the appropriate type.
+             */
+            return;
+        }
+
+        if (itemNode.getType().equals(NestedSchemaNodeType.RECORD))
+            addRecord((RecordSchemaNode) itemNode);
+        else if (itemNode.getType().equals(NestedSchemaNodeType.LIST))
+            addList((ListSchemaNode) itemNode);
+        else {
+            if (flatNode == null) {
+                flatNode = (FlatSchemaNode) itemNode;
+                nullable = flatNode.type == FlatSchemaNodeType.NULL;
+            } else {
+                nullable = flatNode.type == FlatSchemaNodeType.NULL || itemNode.getType() == FlatSchemaNodeType.NULL;
+                flatNode = FlatSchemaNode.computeFlatType(flatNode.type, itemNode.getType(), typeComputer);
+                innerType = flatNode.type;
+            }
+
+            if (nullable)
+                setAsNullabe();
+        }
+    }
+
+    protected void addList(ListSchemaNode itemNode) {
+        if (innerListNode == null)
+            innerListNode = itemNode;
+        else
+            innerListNode.resolveInternals(itemNode);
+    }
+
+    protected void addRecord(RecordSchemaNode recordNode) {
+        if (innerFields == null) {
+            innerFields = new LinkedList<>();
+            innerFieldsMap = new HashMap<>();
+            noneNullableFields = new TreeSet<>();
+        }
+
+        Integer fieldIndex = innerFieldsMap.get(recordNode.getFieldName());
+
+        //if a record with similar name exists
+        if (fieldIndex != null) {
+            RecordSchemaNode node = innerFields.get(fieldIndex);
+            node.resolveInternals(recordNode);
+        } else {
+            innerFields.add(recordNode);
+            innerFieldsMap.put(recordNode.getFieldName(), innerFields.size() - 1);
+        }
+    }
+
+    /**
+     * A type checker
+     * 
+     * @param t0
+     *            first type
+     * @param t1
+     *            second type
+     * @return true
+     *         if t0 and t1 are Nested vs Primitive || List vs Record
+     *         false
+     *         otherwise
+     */
+    protected boolean checkHetro(ISchemaNodeType t0, ISchemaNodeType t1) {
+
+        if ((t0 instanceof FlatSchemaNodeType && t1 instanceof NestedSchemaNodeType)
+                || (t0 instanceof NestedSchemaNodeType && t1 instanceof FlatSchemaNodeType)
+                || (t0 instanceof NestedSchemaNodeType && t1 instanceof NestedSchemaNodeType && !t0.equals(t1)))
+            return true;
+
+        return false;
+    }
+
+    protected void recycleInners() {
+        if (innerType.equals(NestedSchemaNodeType.LIST)) {
+            innerListNode.recycleInners();
+            innerListNode.recycle();
+            innerListNode = null;
+        } else if (innerType.equals(NestedSchemaNodeType.RECORD)) {
+            for (RecordSchemaNode r : innerFields) {
+                r.recycleInners();
+                r.recycle();
+            }
+            innerFields.clear();
+        }
+    }
+
+    /**
+     * Put the object back to the pool.
+     */
+    public void recycle() {
+        if (innerFields != null) {
+            innerFields.clear();
+            innerFieldsMap.clear();
+            noneNullableFields.clear();
+        }
+        innerListNode = null;
+        innerType = null;
+        flatNode = null;
+        if (nodePool != null)
+            nodePool.recycle(this);
+    }
+
+    public void setObjectPool(NodesObjectPool nodePool) {
+        this.nodePool = nodePool;
+    }
+
+    public ISchemaNodeType getInnerType() {
+        return innerType;
+    }
+
+    /**
+     * Merge the schema of two complex nodes into one single one.
+     * When types are different, we should compute the heterogeneous type using
+     * IHeterogeneousTypeComputer implementation.
+     * 
+     * @param itemNode
+     * @return
+     */
+    public ISchemaNode resolveInternals(ISchemaNode itemNode) {
+        AbstractNestedSchemaNode nestedNode = (AbstractNestedSchemaNode) itemNode;
+
+        if (!nestedNode.isInnerTypeSet()) {
+            innerType = FlatSchemaNodeType.NULL;
+            flatNode = FlatSchemaNode.NULL_NODE;
+        }
+
+        if (!isInnerTypeSet()) {
+            innerType = FlatSchemaNodeType.NULL;
+            flatNode = FlatSchemaNode.NULL_NODE;
+        }
+
+        if (nestedNode.type.isNullable())
+            setAsNullabe();
+
+        if (checkHetro(innerType, nestedNode.innerType)) {
+            computeHeterogenousType(itemNode);
+        } else {
+            if (innerType.equals(NestedSchemaNodeType.RECORD)) {
+                mergeFields(nestedNode);
+            } else if (innerType.equals(NestedSchemaNodeType.LIST)) {
+                innerListNode.resolveInternals(nestedNode.innerListNode);
+            } else {
+                boolean nullable = innerType == FlatSchemaNodeType.NULL
+                        || nestedNode.innerType == FlatSchemaNodeType.NULL;
+                flatNode = FlatSchemaNode.computeFlatType(flatNode.type, nestedNode.flatNode.type, typeComputer);
+                innerType = flatNode.type;
+                if (nullable)
+                    setAsNullabe();
+            }
+        }
+
+        return this;
+    }
+
+    private void mergeFields(AbstractNestedSchemaNode nestedNode) {
+        noneNullableFields.clear();
+        for (RecordSchemaNode r : nestedNode.innerFields) {
+            AbstractNestedSchemaNode record = getInnerField(r.getFieldName());
+            if (record != null) {
+                record.resolveInternals(r);
+            } else {
+                //That means @this didn't have record @r. @r must be nullable.
+                r.setAsNullabe();
+                addRecord(r);
+            }
+        }
+
+        /*
+         * If nestedNode doesn't contain certain fields in 'this'
+         * Set them as nullables
+         */
+        setInnerNullables();
+    }
+
+    public AbstractNestedSchemaNode getInnerListNode() {
+        return innerListNode;
+    }
+
+    /**
+     * What to do when two types @this.innerType and @param.getType() (or @param.innerType) have different types:
+     * List vs. Record
+     * List vs. Primitive
+     * Record vs. Primitive
+     * 
+     * @param itemNode
+     */
+    private boolean computeHeterogenousType(ISchemaNode itemNode) {
+        ISchemaNodeType computedType = null;
+
+        if (itemNode.getType() instanceof FlatSchemaNodeType || innerType instanceof FlatSchemaNodeType)
+            computedType = typeComputer.computePrimitiveWithNestedType(itemNode.getType(), innerType);
+        //If this.type = itemNode.getType(), then they must have different innerType 
+        else if (itemNode.getType().equals(type))
+            computedType = typeComputer.computePrimitiveWithNestedType(((AbstractNestedSchemaNode) itemNode).innerType,
+                    innerType);
+        else
+            computedType = typeComputer.computeNestedType(itemNode.getType(), innerType);
+
+        /* if we inferred a different type than the current innerType, proceed.
+         * Heterogeneous includes NULL... so it's up to IHeterogeneousTypeComputer 
+         * to infer the new type after adding NULL value as a different type such as UNION(NULL,Type).
+         */
+
+        if (computedType != innerType) {
+            /* If the innerType is NULL, we should proceed and add itemNode and we don't stop inferring.
+             * otherwise, it depends on the type computer. 
+             */
+            stopInfer = typeComputer.stopInferring()
+                    && (innerType != FlatSchemaNodeType.NULL && !computedType.equals(innerType));
+
+            //That means we don't care about the added item
+            //Therefore we will recycle if it's nested.
+            if (stopInfer) {
+                recycleInners();
+                if (computedType instanceof FlatSchemaNodeType)
+                    flatNode = FlatSchemaNode.getFlatNode(computedType);
+                if (itemNode.getType() instanceof NestedSchemaNodeType) {
+                    ((AbstractNestedSchemaNode) itemNode).recycle();
+                }
+            }
+        }
+
+        innerType = computedType;
+
+        return stopInfer;
+    }
+
+    //Output schema
+
+    protected void printNestedPretty(PrintWriter out, int level) {
+        if (innerType instanceof FlatSchemaNodeType) {
+            flatNode.printPretty(out);
+        } else {
+
+            if (type.isNullable())
+                out.println(innerType.toString().toLowerCase() + " [nullable]");
+            else
+                out.println(innerType.toString().toLowerCase());
+
+            if (innerType.equals(NestedSchemaNodeType.RECORD)) {
+                for (RecordSchemaNode r : innerFields) {
+                    printIndent(out, level + 1);
+                    r.printRecordPretty(out, level + 1);
+                }
+            } else {
+                printIndent(out, level + 1);
+                innerListNode.printListPretty(out, level + 1);
+            }
+        }
+    }
+
+    @Override
+    public void printPretty(PrintWriter out) {
+        if (innerType.isNullable())
+            out.println("root [nullable]");
+        else
+            out.println("root");
+        //if the root is record or list of records print the inner fields
+        if (innerType.equals(NestedSchemaNodeType.RECORD)) {
+            for (RecordSchemaNode r : innerFields) {
+                printIndent(out, 1);
+                r.printRecordPretty(out, 1);
+            }
+        } else { //List
+            printIndent(out, 1);
+            out.println(getType().toString().toLowerCase());
+            ListSchemaNode listNode = (ListSchemaNode) this;
+            printIndent(out, 2);
+            listNode.printListPretty(out, 2);
+        }
+    }
+
+    protected void printNestedDummyJSON(PrintWriter out) {
+        out.println("root");
+        //if the root is record or list of records print the inner fields
+        if (type.equals(NestedSchemaNodeType.RECORD)) {
+            for (RecordSchemaNode r : innerFields) {
+                printIndent(out, 1);
+                r.printRecordPretty(out, 1);
+            }
+        } else { //List
+            printIndent(out, 1);
+            out.println(getType().toString().toLowerCase());
+            ListSchemaNode listNode = (ListSchemaNode) this;
+            printIndent(out, 2);
+            listNode.printListPretty(out, 2);
+        }
+    }
+
+    @Override
+    public void toDummyJSON(JSONArray tuple) throws JSONException {
+        if (innerType == null)
+            return;
+        //if the root is record or list of records print the inner fields
+        if (type.equals(NestedSchemaNodeType.RECORD)) {
+            JSONObject record = new JSONObject();
+            for (RecordSchemaNode r : innerFields) {
+                r.recordToDummyJSON(record);
+            }
+            tuple.put(record);
+        } else { //List
+            ListSchemaNode listNode = (ListSchemaNode) this;
+            tuple.put(listNode.listToDummyJSON());
+        }
+    }
+
+    @Override
+    public String toADM() {
+        if (innerType == null)
+            return null;
+        addCreateStmt = typeComputer.getName().equals("Asterix");
+        if (innerType instanceof NestedSchemaNodeType) {
+            rootAdmString = new StringBuilder();
+            fieldNameCountMap = new HashMap<>();
+            creatAdmType("Dataset", rootAdmString, this);
+            return rootAdmString.toString();
+        }
+        if (addCreateStmt) {
+            return "create type DatasetType1 as open {\n" + ResultSchemaUtils.SPACE_INDENT
+                    + (innerType == FlatSchemaNodeType.ANY ? "" : "item:" + innerType.toString().toLowerCase()) + "\n}";
+        } else
+            return "DatasetType1{\n" + ResultSchemaUtils.SPACE_INDENT + "item:" + innerType.toString().toLowerCase()
+                    + "\n}";
+    }
+
+    private String creatAdmType(String fieldName, StringBuilder admString, AbstractNestedSchemaNode node) {
+        MutableInt count = fieldNameCountMap.get(fieldName);
+        if (count == null) {
+            count = new MutableInt(0);
+            fieldNameCountMap.put(fieldName, count);
+        }
+        count.increment();
+        fieldName = fieldName + "Type" + count.getValue();
+        StringBuilder admTypeString = new StringBuilder();
+        int lastTypeEndOffset = -1;
+
+        /* This to handle a corner case where the dataset type looks:
+         * DatasetType {
+         *  list:[]
+         * }
+         * 
+         * i.e there's no name for the list
+         */
+        if (node == this && node.innerType.equals(NestedSchemaNodeType.LIST)) {
+            if (node.innerListNode.innerType == FlatSchemaNodeType.ANY)
+                return null;
+            admTypeString.append(ResultSchemaUtils.SPACE_INDENT + "list:");
+            printSchemaAsADM(node.innerListNode, admTypeString);
+            admTypeString.append('\n');
+        } else {
+
+            for (int i = 0; i < node.innerFields.size(); i++) {
+                RecordSchemaNode r = node.innerFields.get(i);
+
+                if (r.innerType == FlatSchemaNodeType.ANY)
+                    admTypeString
+                            .append(ResultSchemaUtils.SPACE_INDENT + "//" + r.getFieldName() + " is heterogeneous.\n");
+                else if (r.innerType == FlatSchemaNodeType.NULL)
+                    admTypeString
+                            .append(ResultSchemaUtils.SPACE_INDENT + "//" + r.getFieldName() + " is always null.\n");
+                else {
+                    printSchemaAsADM(r, admTypeString);
+                    lastTypeEndOffset = admTypeString.length();
+                }
+            }
+        }
+
+        //Delete comma from the last specified type.
+        if (lastTypeEndOffset > 2) {
+            admTypeString.delete(lastTypeEndOffset - 2, lastTypeEndOffset - 1);
+        }
+
+        //Delete last newline.
+        if (admTypeString.length() > 0)
+            admTypeString.setLength(admTypeString.length() - 1);
+
+        int offset = rootAdmString.length();
+        rootAdmString.insert(offset, "}\n");
+        rootAdmString.insert(offset, admTypeString.toString() + "\n");
+        if (addCreateStmt) {
+            rootAdmString.insert(offset, "create type " + fieldName + " as open {\n");
+        } else
+            rootAdmString.insert(offset, fieldName + "{\n");
+
+        return fieldName;
+
+    }
+
+    private boolean printSchemaAsADM(AbstractNestedSchemaNode node, StringBuilder admString) {
+
+        /*
+         * admTypeName for:
+         * 1- fieldNameType for named record. e.g. "name":{"field1":"String","field2":1}
+         * 2- listType for list of type records e.g [{"field1":"String","field2":1}]
+         */
+        String admTypeName = null;
+        String suffix = null;
+        boolean isAny = false;
+        int prevLength = admString.length();
+        if (node.getType().equals(NestedSchemaNodeType.RECORD)) {
+            RecordSchemaNode recordNode = (RecordSchemaNode) node;
+            admTypeName = recordNode.getFieldName();
+
+            if (addCreateStmt)
+                admString.append(ResultSchemaUtils.SPACE_INDENT + "'" + admTypeName + "'" + ":");
+            else
+                admString.append(ResultSchemaUtils.SPACE_INDENT + admTypeName + ":");
+
+            if (node.getType().isNullable() && addCreateStmt)
+                suffix = "?,\n";
+            else
+                suffix = ",\n";
+        } else if (node.innerType != FlatSchemaNodeType.ANY && node.innerType != FlatSchemaNodeType.NULL) {
+            admTypeName = "list";
+            admString.append("[");
+            if (node.getType().isNullable() && addCreateStmt)
+                suffix = "]?";
+            else
+                suffix = "]";
+
+        } else {
+            isAny = true;
+        }
+
+        if (!isAny) {
+            if (node.innerType instanceof FlatSchemaNodeType) {
+                String flatType = node.innerType.toString().toLowerCase();
+                admString.append(flatType + suffix);
+            } else {
+                if (node.innerType.equals(NestedSchemaNodeType.RECORD)) {
+                    admString.append(creatAdmType(admTypeName, admString, node) + suffix);
+                } else {
+                    isAny = printSchemaAsADM(node.innerListNode, admString);
+                    if (isAny && node.getType().equals(NestedSchemaNodeType.RECORD)) {
+                        admString.setLength(prevLength);
+                    } else
+                        admString.append(suffix);
+                }
+            }
+        }
+        return isAny;
+    }
+
+    //SchemaBuilder helpers 
+
+    public boolean isStopInfer() {
+        return stopInfer;
+    }
+
+    public boolean isSeen() {
+        return seen;
+    }
+
+    public void setSeen(boolean seen) {
+        this.seen = seen;
+    }
+
+    public void reinit() {
+        stopInfer = false;
+        innerType = null;
+    }
+
+    public boolean isInnerTypeSet() {
+        return innerType != null;
+    }
+
+    public void setTuple() {
+        tuple = true;
+    }
+
+    public AbstractNestedSchemaNode getInnerField(String fieldName) {
+        RecordSchemaNode record = null;
+        if (innerFields != null) {
+            Integer index = innerFieldsMap.get(fieldName);
+
+            if (index != null) {
+                record = innerFields.get(index);
+                if (!record.type.isNullable())
+                    noneNullableFields.add(index);
+            }
+        }
+
+        return record;
+    }
+
+    public void setAsNullabe() {
+        type = type.getNullableType();
+    }
+
+    public void setInnerNullables() {
+
+        if (noneNullableFields == null || inFirstRecord)
+            return;
+
+        //Nothing is nullable
+        if (noneNullableFields.size() == innerFields.size()) {
+            noneNullableFields.clear();
+            return;
+        }
+
+        for (int i = 0; i < innerFields.size(); i++) {
+            if (!noneNullableFields.contains(i)) {
+                RecordSchemaNode record = innerFields.get(i);
+                record.type = record.type.getNullableType();
+            }
+        }
+
+        noneNullableFields.clear();
+    }
+
+    /**
+     * A flag setter to determine that the first record in the schema
+     * has been inferred.
+     */
+    public void finisedFirstRecord() {
+        if (inFirstRecord)
+            inFirstRecord = innerType == FlatSchemaNodeType.NULL;
+        else
+            setInnerNullables();
+
+    }
+
+    /**
+     * Returns true if SchemaBuilder is still in the first pass
+     * 
+     * @return
+     */
+    public boolean isFirstPass() {
+        return inFirstRecord;
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractSchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractSchemaNode.java
new file mode 100644
index 0000000..538ff5a
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/AbstractSchemaNode.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+
+public abstract class AbstractSchemaNode implements ISchemaNode {
+
+    private static final long serialVersionUID = 1L;
+
+    protected IHeterogeneousTypeComputer typeComputer;
+    protected ISchemaNodeType type;
+
+    protected static void printIndent(PrintWriter out, int level) {
+        if (level == 1) {
+            out.print("|-- ");
+        } else {
+            for (int i = 1; i < level; i++) {
+                out.print("|" + ResultSchemaUtils.SPACE_INDENT);
+            }
+            out.print("|-- ");
+        }
+    }
+
+    @Override
+    public ISchemaNodeType getType() {
+        return type;
+    }
+
+    @Override
+    public void setHeterogeneousTypeComputer(IHeterogeneousTypeComputer typeComputer) {
+        this.typeComputer = typeComputer;
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/FlatSchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/FlatSchemaNode.java
new file mode 100644
index 0000000..1ec8282
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/FlatSchemaNode.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.json.JSONArray;
+
+/**
+ * A primitive schema node for primitive types
+ * We named the class as "Flat" to be consistent with the "APrint" classes.
+ */
+public class FlatSchemaNode extends AbstractSchemaNode {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final FlatSchemaNode STRING_NODE = new FlatSchemaNode(FlatSchemaNodeType.STRING);
+    public static final FlatSchemaNode DOUBLE_NODE = new FlatSchemaNode(FlatSchemaNodeType.DOUBLE);
+    public static final FlatSchemaNode LONG_NODE = new FlatSchemaNode(FlatSchemaNodeType.INT64);
+    public static final FlatSchemaNode BOOLEAN_NODE = new FlatSchemaNode(FlatSchemaNodeType.BOOLEAN);
+    public static final FlatSchemaNode NULL_NODE = new FlatSchemaNode(FlatSchemaNodeType.NULL);
+    public static final FlatSchemaNode ANY_NODE = new FlatSchemaNode(FlatSchemaNodeType.ANY);
+
+    /**
+     * Create a primitive node of @param
+     * 
+     * @param type
+     */
+    public FlatSchemaNode(ISchemaNodeType type) {
+        this.type = type;
+    }
+
+    @Override
+    public String toString() {
+        return type.toString();
+    }
+
+    /**
+     * Return a node of a given type
+     * 
+     * @param type
+     * @return
+     */
+    public static FlatSchemaNode getFlatNode(ISchemaNodeType type) {
+        FlatSchemaNodeType flatType = (FlatSchemaNodeType) type;
+        switch (flatType) {
+            case STRING:
+                return STRING_NODE;
+            case BOOLEAN:
+                return BOOLEAN_NODE;
+            case DOUBLE:
+                return DOUBLE_NODE;
+            case INT64:
+                return LONG_NODE;
+            case NULL:
+                return NULL_NODE;
+            case ANY:
+                return ANY_NODE;
+        }
+        return ANY_NODE;
+    }
+
+    /**
+     * Get the required type of two different primitive types
+     * 
+     * @param type1
+     * @param type2
+     * @param typeComputer
+     * @return
+     */
+    public static FlatSchemaNode computeFlatType(ISchemaNodeType type1, ISchemaNodeType type2,
+            IHeterogeneousTypeComputer typeComputer) {
+        if (type1 == type2)
+            return getFlatNode(type1);
+        FlatSchemaNodeType thisType = (FlatSchemaNodeType) type1;
+        FlatSchemaNodeType otherType = (FlatSchemaNodeType) type2;
+        FlatSchemaNodeType newType = (FlatSchemaNodeType) typeComputer.computePrimitiveType(thisType, otherType);
+        return getFlatNode(newType);
+
+    }
+
+    @Override
+    public void printPretty(PrintWriter out) {
+        out.print(type.toString().toLowerCase());
+
+    }
+
+    @Override
+    public void toDummyJSON(JSONArray tuple) {
+        throw new NotImplementedException("toDummyJSON shouldn't be called from FlatSchemaNode");
+    }
+
+    @Override
+    public String toADM() {
+        throw new NotImplementedException("toADM shouldn't be called from FlatSchemaNode");
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/IHeterogeneousTypeComputer.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/IHeterogeneousTypeComputer.java
new file mode 100644
index 0000000..93066db
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/IHeterogeneousTypeComputer.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.Serializable;
+
+/**
+ * The main interface that helps SchemaBuilder to determine the appropriate type
+ * of fields having the same name and different types.
+ * 
+ * @author wail
+ */
+public interface IHeterogeneousTypeComputer extends Serializable {
+
+    /**
+     * To tell the SchemaBuilder to stop building a schema.
+     * Sometimes (Asterix) infer types as ANY. Therefore, no
+     * need to infer schema of such type.
+     * 
+     * @return
+     */
+    public boolean stopInferring();
+
+    /**
+     * The name of the type computer.
+     * 
+     * @return
+     */
+    public String getName();
+
+    /**
+     * Compute primitive types such as String and Float.
+     * 
+     * @param type1
+     * @param type2
+     * @return
+     */
+    public ISchemaNodeType computePrimitiveType(ISchemaNodeType type1, ISchemaNodeType type2);
+
+    /**
+     * Compute complex types such as List and Record.
+     * 
+     * @param type1
+     * @param type2
+     * @return
+     */
+    public ISchemaNodeType computeNestedType(ISchemaNodeType type1, ISchemaNodeType type2);
+
+    /**
+     * Compute complex types with primitive ones such as String and List.
+     * 
+     * @param type1
+     * @param type2
+     * @return
+     */
+    public ISchemaNodeType computePrimitiveWithNestedType(ISchemaNodeType type1, ISchemaNodeType type2);
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNode.java
new file mode 100644
index 0000000..70da8dd
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNode.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+
+/**
+ * Interface for the schema nodes implementations.
+ * 
+ * @author wail
+ */
+public interface ISchemaNode extends Serializable {
+
+    /**
+     * The type of the node.
+     * 
+     * @return
+     */
+    public ISchemaNodeType getType();
+
+    /**
+     * IHeterogeneousTypeComputer implementation setter.
+     * 
+     * @param typeComputer
+     */
+    public void setHeterogeneousTypeComputer(IHeterogeneousTypeComputer typeComputer);
+
+    /**
+     * Print human-consumable schema.
+     * 
+     * @param out
+     */
+    public void printPretty(PrintWriter out);
+
+    /**
+     * Return the schema as a dummy JSON that represents the schema.
+     * 
+     * @param tuple
+     * @throws JSONException
+     */
+    public void toDummyJSON(JSONArray tuple) throws JSONException;
+
+    /**
+     * Return the schema as ADM-CREATE DLL.
+     * 
+     * @return
+     */
+    public String toADM();
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNodeType.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNodeType.java
new file mode 100644
index 0000000..7d85422
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ISchemaNodeType.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+/**
+ * An interface for Enum
+ * 
+ * @author wail
+ */
+public interface ISchemaNodeType {
+
+    /**
+     * Types can be nullable. This method helps to compare
+     * actual type.
+     * 
+     * @param type
+     * @return
+     */
+    public boolean equals(ISchemaNodeType type);
+
+    /**
+     * Returns true for nullable type.
+     * 
+     * @return
+     */
+    public boolean isNullable();
+
+    /**
+     * Returns the nullable type of certain type.
+     * 
+     * @return
+     */
+    public ISchemaNodeType getNullableType();
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ListSchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ListSchemaNode.java
new file mode 100644
index 0000000..c1ee7a6
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/ListSchemaNode.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+import org.apache.asterix.schema.node.type.NestedSchemaNodeType;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * List Schema Node
+ */
+public class ListSchemaNode extends AbstractNestedSchemaNode {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Creates schema node for list type
+     */
+    public ListSchemaNode() {
+        type = NestedSchemaNodeType.LIST;
+    }
+
+    /**
+     * Prints the list in a human-consumable format.
+     * 
+     * @param out
+     * @param level
+     */
+    public void printListPretty(PrintWriter out, int level) {
+        out.print("item:");
+        printNestedPretty(out, level);
+        if (innerType instanceof FlatSchemaNodeType)
+            out.println(type.isNullable() ? " [nullable]" : "");
+    }
+
+    /**
+     * Return the list's schema as a dummy JSON that represents the schema.
+     * 
+     * @return
+     * @throws JSONException
+     */
+    public JSONArray listToDummyJSON() throws JSONException {
+        JSONArray array = new JSONArray();
+        if (innerType instanceof FlatSchemaNodeType) {
+            FlatSchemaNodeType flatType = (FlatSchemaNodeType) innerType;
+            switch (flatType) {
+                case STRING:
+                    array.put(ResultSchemaUtils.STRING_DUMMY_VALUE);
+                    break;
+                case INT64:
+                    array.put(ResultSchemaUtils.LONG_DUMMY_VALUE);
+                    break;
+                case DOUBLE:
+                    array.put(ResultSchemaUtils.DOUBLE_DUMMY_VALUE);
+                    break;
+                case BOOLEAN:
+                    array.put(ResultSchemaUtils.BOOLEAN_DUMMY_VALUE);
+                    break;
+                case NULL:
+                    array.put(ResultSchemaUtils.NULL_DUMMY_VALUE);
+                    break;
+                case ANY:
+                    break;
+            }
+        } else if (innerType == NestedSchemaNodeType.RECORD) {
+            JSONObject nestedRecord = new JSONObject();
+            for (RecordSchemaNode r : innerFields) {
+                r.recordToDummyJSON(nestedRecord);
+            }
+            array.put(nestedRecord);
+        } else
+            array.put(innerListNode.listToDummyJSON());
+
+        return array;
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/RecordSchemaNode.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/RecordSchemaNode.java
new file mode 100644
index 0000000..ffc0c92
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/RecordSchemaNode.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node;
+
+import java.io.PrintWriter;
+
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+import org.apache.asterix.schema.node.type.NestedSchemaNodeType;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Record Schema Node
+ */
+public class RecordSchemaNode extends AbstractNestedSchemaNode {
+
+    private static final long serialVersionUID = 1L;
+
+    private String fieldName;
+
+    /**
+     * Creates schema node for record type.
+     */
+    public RecordSchemaNode() {
+        type = NestedSchemaNodeType.RECORD;
+    }
+
+    public String getFieldName() {
+        return fieldName;
+    }
+
+    public void setFieldName(String fieldName) {
+        this.fieldName = fieldName;
+    }
+
+    @Override
+    public void reinit() {
+        super.reinit();
+        fieldName = null;
+    }
+
+    /**
+     * Prints the record in a human-consumable format.
+     * 
+     * @param out
+     * @param level
+     */
+    public void printRecordPretty(PrintWriter out, int level) {
+        out.print(fieldName + ":");
+        printNestedPretty(out, level);
+        if (innerType instanceof FlatSchemaNodeType)
+            out.println(type.isNullable() ? " [nullable]" : "");
+    }
+
+    /**
+     * Return the record's schema as a dummy JSON that represents the schema.
+     * 
+     * @param record
+     * @throws JSONException
+     */
+    public void recordToDummyJSON(JSONObject record) throws JSONException {
+        if (innerType instanceof FlatSchemaNodeType) {
+            FlatSchemaNodeType flatType = (FlatSchemaNodeType) innerType;
+            switch (flatType) {
+                case STRING:
+                    record.put(fieldName, ResultSchemaUtils.STRING_DUMMY_VALUE);
+                    break;
+                case INT64:
+                    record.put(fieldName, ResultSchemaUtils.LONG_DUMMY_VALUE);
+                    break;
+                case DOUBLE:
+                    record.put(fieldName, ResultSchemaUtils.DOUBLE_DUMMY_VALUE);
+                    break;
+                case BOOLEAN:
+                    record.put(fieldName, ResultSchemaUtils.BOOLEAN_DUMMY_VALUE);
+                    break;
+                case NULL:
+                    record.put(fieldName, ResultSchemaUtils.NULL_DUMMY_VALUE);
+                    break;
+                case ANY:
+                    break;
+            }
+        } else if (innerType == NestedSchemaNodeType.RECORD) {
+            JSONObject nestedRecord = new JSONObject();
+            for (RecordSchemaNode r : innerFields) {
+                r.recordToDummyJSON(nestedRecord);
+            }
+            record.put(fieldName, nestedRecord);
+        } else
+            record.put(fieldName, innerListNode.listToDummyJSON());
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/pool/NodesObjectPool.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/pool/NodesObjectPool.java
new file mode 100644
index 0000000..fe298b7
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/pool/NodesObjectPool.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node.pool;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.asterix.schema.node.AbstractNestedSchemaNode;
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.asterix.schema.node.ListSchemaNode;
+import org.apache.asterix.schema.node.RecordSchemaNode;
+import org.apache.asterix.schema.node.type.NestedSchemaNodeType;
+
+/**
+ * Object pool for SchemaNode.
+ */
+public class NodesObjectPool {
+
+    private Queue<RecordSchemaNode> recordPool = new LinkedList<>();
+    private Queue<ListSchemaNode> listPool = new LinkedList<>();
+
+    private final IHeterogeneousTypeComputer typeComputer;
+
+    /**
+     * Creates NodeObjectPool.
+     * 
+     * @param typeComputer
+     */
+    public NodesObjectPool(IHeterogeneousTypeComputer typeComputer) {
+        this.typeComputer = typeComputer;
+    }
+
+    /**
+     * Put back the object to reused.
+     * 
+     * @param nestedNode
+     */
+    public void recycle(AbstractNestedSchemaNode nestedNode) {
+        if (nestedNode.getType().equals(NestedSchemaNodeType.LIST))
+            listPool.add((ListSchemaNode) nestedNode);
+        else
+            recordPool.add((RecordSchemaNode) nestedNode);
+    }
+
+    /**
+     * Get a SchemaNode of type Record.
+     * 
+     * @return
+     */
+    public RecordSchemaNode getRecrodNode() {
+        RecordSchemaNode recordNode;
+        if (!recordPool.isEmpty()) {
+            recordNode = recordPool.remove();
+            recordNode.reinit();
+        } else {
+            recordNode = new RecordSchemaNode();
+            recordNode.setObjectPool(this);
+            recordNode.setHeterogeneousTypeComputer(typeComputer);
+        }
+        return recordNode;
+    }
+
+    /**
+     * Get a SchemaNode of type List.
+     * 
+     * @return
+     */
+    public ListSchemaNode getListNode() {
+        ListSchemaNode listNode;
+
+        if (!listPool.isEmpty())
+            listNode = listPool.remove();
+        else {
+            listNode = new ListSchemaNode();
+            listNode.setObjectPool(this);
+            listNode.setHeterogeneousTypeComputer(typeComputer);
+        }
+        return listNode;
+    }
+
+    /**
+     * Reset object pool.
+     */
+    public void clear() {
+        listPool.clear();
+        recordPool.clear();
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/FlatSchemaNodeType.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/FlatSchemaNodeType.java
new file mode 100644
index 0000000..290ed95
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/FlatSchemaNodeType.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node.type;
+
+import org.apache.asterix.schema.node.ISchemaNodeType;
+
+/**
+ * Primitive type Enum
+ * 
+ * @author wail
+ */
+public enum FlatSchemaNodeType implements ISchemaNodeType {
+    STRING,
+    INT64,
+    DOUBLE,
+    BOOLEAN,
+    NULL,
+    ANY;
+
+    @Override
+    public ISchemaNodeType getNullableType() {
+        return NULL;
+    }
+
+    @Override
+    public boolean isNullable() {
+        return this == NULL;
+    }
+
+    @Override
+    public boolean equals(ISchemaNodeType type) {
+        if (this == type)
+            return true;
+
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/NestedSchemaNodeType.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/NestedSchemaNodeType.java
new file mode 100644
index 0000000..d0d891f
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/node/type/NestedSchemaNodeType.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.node.type;
+
+import org.apache.asterix.schema.node.ISchemaNodeType;
+
+/**
+ * Compex type Enum
+ * 
+ * @author wail
+ */
+public enum NestedSchemaNodeType implements ISchemaNodeType {
+    RECORD,
+    LIST,
+    OPT_RECORD,
+    OPT_LIST;
+
+    @Override
+    public boolean equals(ISchemaNodeType type) {
+        if (type instanceof FlatSchemaNodeType)
+            return false;
+        NestedSchemaNodeType nestedType = (NestedSchemaNodeType) type;
+        return (this.isRecord() && nestedType.isRecord()) || (this.isList() && nestedType.isList());
+    }
+
+    private boolean isRecord() {
+        return this == RECORD || this == OPT_RECORD;
+    }
+
+    private boolean isList() {
+        return this == LIST || this == OPT_LIST;
+    }
+
+    public ISchemaNodeType getNullableType() {
+        switch (this) {
+            case LIST:
+                return OPT_LIST;
+            case RECORD:
+                return OPT_RECORD;
+            default:
+                return this;
+        }
+    }
+
+    public boolean isNullable() {
+        return this == OPT_LIST || this == OPT_RECORD;
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaDircetoryService.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaDircetoryService.java
new file mode 100644
index 0000000..09271da
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaDircetoryService.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.schema.message.SchemaMessage;
+import org.apache.asterix.schema.message.SchemaRegisterMessage;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+/**
+ * A directory that maps the inferred schema with JobID.
+ * 
+ * @author wail
+ */
+public class SchemaDircetoryService {
+
+    public static final SchemaDircetoryService INSTANCE = new SchemaDircetoryService();
+
+    private AtomicLong schemaId;
+    private ICCMessageBroker ccMessageBroker;
+    private final HashMap<Long, Long> jobSchemaIdMap;
+    private final HashMap<Long, List<SchemaDirectoryeRecord>> schemaLocations;
+    private final HashMap<Long, List<ISchemaNode>> jobSchemas;
+
+    private SchemaDircetoryService() {
+        jobSchemaIdMap = new HashMap<>();
+        schemaLocations = new HashMap<>();
+        jobSchemas = new HashMap<>();
+        schemaId = new AtomicLong(0);
+    }
+
+    /**
+     * Every partition generates a schema and therefore they need to inform the directory that
+     * this partition has a schema.
+     * 
+     * @param message
+     * @param nodeId
+     */
+    public synchronized void registerSchemaLocation(SchemaRegisterMessage message, String nodeId) {
+        List<SchemaDirectoryeRecord> direcotryRecords = schemaLocations.get(message.getSchemaId());
+        if (direcotryRecords == null) {
+            direcotryRecords = new LinkedList<>();
+            schemaLocations.put(message.getSchemaId(), direcotryRecords);
+        }
+
+        direcotryRecords.add(new SchemaDirectoryeRecord(message, nodeId));
+
+        notifyAll();
+    }
+
+    /**
+     * Map the schemaId with jobId.
+     * 
+     * @param jobId
+     * @param schemaId
+     */
+    public synchronized void setJobSchemaId(long jobId, long schemaId) {
+        jobSchemaIdMap.put(jobId, schemaId);
+        notifyAll();
+    }
+
+    /**
+     * get all schemas from all registered partitions.
+     * 
+     * @param jobId
+     * @return
+     * @throws InterruptedException
+     */
+    public List<ISchemaNode> getSchemas(long jobId) throws InterruptedException {
+        long schemaId = jobSchemaIdMap.get(jobId);
+        List<ISchemaNode> schemas = jobSchemas.get(schemaId);
+        boolean waitForSchemas = true;
+        while (waitForSchemas) {
+            synchronized (schemas) {
+                if (jobSchemas.get(schemaId) != null
+                        && (jobSchemas.get(schemaId).size() == schemaLocations.get(schemaId).size())) {
+                    waitForSchemas = false;
+                    schemas = jobSchemas.remove(schemaId);
+
+                    //clear after retrieval 
+                    jobSchemaIdMap.remove(jobId);
+                    schemaLocations.remove(schemaId);
+                } else {
+                    schemas.wait();
+                }
+            }
+        }
+
+        return schemas;
+    }
+
+    /**
+     * This method receives messages from {@link org.apache.asterix.common.messaging.api.ICCMessageBroker}
+     * and populate the schema nodes.
+     * 
+     * @param message
+     */
+    public void populateSchema(SchemaMessage message) {
+        long messageSchemaId = message.getSchemaId();
+        List<ISchemaNode> schemas = jobSchemas.get(messageSchemaId);
+        synchronized (schemas) {
+            schemas.add(message.getSchema());
+            if (schemas.size() == schemaLocations.get(messageSchemaId).size())
+                schemas.notify();
+        }
+    }
+
+    /**
+     * The directory will ask all NCs that have schema to send them.
+     * 
+     * @param jobId
+     * @return
+     * @throws AsterixException
+     */
+    public synchronized boolean prepareSchemas(long jobId) throws AsterixException {
+        boolean nonEmpty = false;
+        Long jobSchemaId = jobSchemaIdMap.get(jobId);
+
+        List<SchemaDirectoryeRecord> direcotryRecords = schemaLocations.get(jobSchemaId);
+        if (direcotryRecords != null) {
+            nonEmpty = true;
+            jobSchemas.put(jobSchemaId, new ArrayList<>(direcotryRecords.size()));
+
+            try {
+                for (SchemaDirectoryeRecord record : direcotryRecords) {
+                    SchemaRegisterMessage registerMessage = record.getMessage();
+                    String nodeId = record.getNodeId();
+                    ccMessageBroker.sendApplicationMessageToNC(registerMessage, nodeId);
+                }
+            } catch (Exception e) {
+                throw new AsterixException(e);
+            }
+        }
+
+        notifyAll();
+        return nonEmpty;
+    }
+
+    public void setMessageBroker(IMessageBroker ccMessageBroker) {
+        this.ccMessageBroker = (ICCMessageBroker) ccMessageBroker;
+    }
+
+    /**
+     * Generate unique schema ID for each job.
+     * 
+     * @return
+     */
+    public long generateSchemaId() {
+        return schemaId.getAndIncrement();
+    }
+
+    private class SchemaDirectoryeRecord {
+        private final SchemaRegisterMessage message;
+        private final String nodeId;
+
+        public SchemaRegisterMessage getMessage() {
+            return message;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public SchemaDirectoryeRecord(SchemaRegisterMessage message, String nodeId) {
+            this.message = message;
+            this.nodeId = nodeId;
+        }
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaReporterService.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaReporterService.java
new file mode 100644
index 0000000..dfc30fc
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/service/SchemaReporterService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.schema.service;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage.ApplicationMessageType;
+import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.schema.message.SchemaMessage;
+import org.apache.asterix.schema.message.SchemaRegisterMessage;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.asterix.schema.utils.ResultSchemaUtils;
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+/**
+ * Service class for each NC to report their inferred schema for each job.
+ */
+public class SchemaReporterService implements IApplicationMessageCallback {
+    private static final Logger LOGGER = Logger.getLogger(SchemaReporterService.class.getName());
+    public static final SchemaReporterService INSTANCE = new SchemaReporterService();
+
+    private INCMessageBroker messageBroker;
+    private final HashMap<Long, List<ISchemaNode>> schemasMap;
+
+    private SchemaReporterService() {
+        schemasMap = new HashMap<>();
+    }
+
+    private void sendRegisterationMessage(long schemaId) throws AsterixException {
+        try {
+            SchemaRegisterMessage message = new SchemaRegisterMessage(schemaId,
+                    ApplicationMessageType.SCHEMA_REGISTER_REQUEST);
+            messageBroker.sendMessage(message, this);
+        } catch (Exception e) {
+            throw new AsterixException(e);
+        }
+    }
+
+    /**
+     * Tells SchemaDirectoryService that this NC has a schema.
+     * 
+     * @param schemaId
+     * @param tuple
+     */
+    public synchronized void registerSchema(long schemaId, ISchemaNode tuple) {
+        List<ISchemaNode> schemas = schemasMap.get(schemaId);
+        if (schemas == null) {
+            schemas = new LinkedList<ISchemaNode>();
+            schemas.add(tuple);
+            schemasMap.put(schemaId, schemas);
+            try {
+                sendRegisterationMessage(schemaId);
+            } catch (Exception e) {
+                Logger.getGlobal().log(Level.INFO, "couldn't send message\n" + e.getMessage());
+            }
+            return;
+        }
+
+        schemas.add(tuple);
+
+        notifyAll();
+    }
+
+    public void setMessageBroker(IMessageBroker messageBroker) {
+        this.messageBroker = (INCMessageBroker) messageBroker;
+    }
+
+    @Override
+    public synchronized void deliverMessageResponse(IApplicationMessage message) {
+        SchemaRegisterMessage schemaRegisterMessage = (SchemaRegisterMessage) message;
+        long schemaId = schemaRegisterMessage.getSchemaId();
+        List<ISchemaNode> schemas = schemasMap.remove(schemaId);
+        ISchemaNode schema = ResultSchemaUtils.mergeSchemas(schemas);
+        SchemaMessage schemaMessage = new SchemaMessage(schemaId, schema);
+        try {
+            messageBroker.sendMessage(schemaMessage, null);
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, "couldn't send message to SchemaDirectoryService.", e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/type/computer/AsterixHeterogeneousTypeComputer.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/type/computer/AsterixHeterogeneousTypeComputer.java
new file mode 100644
index 0000000..e36a778
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/type/computer/AsterixHeterogeneousTypeComputer.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.schema.type.computer;
+
+import org.apache.asterix.schema.node.IHeterogeneousTypeComputer;
+import org.apache.asterix.schema.node.ISchemaNodeType;
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+
+/**
+ * AsterixDB Heterogenous type computer.
+ */
+public class AsterixHeterogeneousTypeComputer implements IHeterogeneousTypeComputer {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean stopInferring() {
+        return true;
+    }
+
+    @Override
+    public ISchemaNodeType computePrimitiveType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        if (type1 == FlatSchemaNodeType.NULL)
+            return type2;
+
+        if (type2 == FlatSchemaNodeType.NULL)
+            return type1;
+
+        if (type1 == type2)
+            return type1;
+
+        if (type1.equals(type2))
+            return type1.getNullableType();
+
+        return FlatSchemaNodeType.ANY;
+    }
+
+    @Override
+    public ISchemaNodeType computeNestedType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        return FlatSchemaNodeType.ANY;
+    }
+
+    @Override
+    public ISchemaNodeType computePrimitiveWithNestedType(ISchemaNodeType type1, ISchemaNodeType type2) {
+        if (type1 == FlatSchemaNodeType.NULL)
+            return type2.getNullableType();
+
+        if (type2 == FlatSchemaNodeType.NULL)
+            return type1.getNullableType();
+
+        return FlatSchemaNodeType.ANY;
+    }
+
+    @Override
+    public String getName() {
+        return "Asterix";
+    }
+
+}
diff --git a/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/utils/ResultSchemaUtils.java b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/utils/ResultSchemaUtils.java
new file mode 100644
index 0000000..b0e9b0d
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/java/org/apache/asterix/schema/utils/ResultSchemaUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.schema.utils;
+
+import java.util.List;
+
+import org.apache.asterix.schema.node.AbstractNestedSchemaNode;
+import org.apache.asterix.schema.node.ISchemaNode;
+import org.apache.asterix.schema.node.type.FlatSchemaNodeType;
+
+/**
+ * A util class for Schema Nodes.
+ */
+public class ResultSchemaUtils {
+    public static final String SPACE_INDENT = "    ";
+    public static final boolean BOOLEAN_DUMMY_VALUE = true;
+    public static final String STRING_DUMMY_VALUE = "string";
+    public static final long LONG_DUMMY_VALUE = 1;
+    public static final double DOUBLE_DUMMY_VALUE = 1.1;
+    public static final Object NULL_DUMMY_VALUE = null;
+
+    private static ISchemaNode mergeSchemas(ISchemaNode node1, ISchemaNode node2) {
+        AbstractNestedSchemaNode nestedNode1 = (AbstractNestedSchemaNode) node1;
+        AbstractNestedSchemaNode nestedNode2 = (AbstractNestedSchemaNode) node2;
+
+        if (nestedNode1.getInnerType() == null || nestedNode1.getInnerType() == FlatSchemaNodeType.NULL) {
+            return nestedNode2.resolveInternals(nestedNode1);
+        }
+
+        return nestedNode1.resolveInternals(nestedNode2);
+
+    }
+
+    /**
+     * Merge all schemas that have been reported by NCs
+     * 
+     * @param schemas
+     * @return
+     */
+    public static ISchemaNode mergeSchemas(List<ISchemaNode> schemas) {
+        ISchemaNode finalSchema = schemas.get(0);
+        for (int i = 1; i < schemas.size(); i++) {
+            finalSchema = mergeSchemas(finalSchema, schemas.get(i));
+        }
+
+        return finalSchema;
+    }
+}
diff --git a/asterixdb/asterix-schema/src/main/resources/schema-inferencers.properties b/asterixdb/asterix-schema/src/main/resources/schema-inferencers.properties
new file mode 100644
index 0000000..75399ea
--- /dev/null
+++ b/asterixdb/asterix-schema/src/main/resources/schema-inferencers.properties
@@ -0,0 +1,2 @@
+Spark=org.apache.asterix.schema.spark.SparkHeterogeneousTypeComputer
+Asterix=org.apache.asterix.schema.type.computer.AsterixHeterogeneousTypeComputer
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index d7b0fed..54ed14c 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -575,6 +575,8 @@
     <module>asterix-replication</module>
     <module>asterix-experiments</module>
     <module>asterix-coverage</module>
+    <module>asterix-schema</module>
+    <module>asterix-schema-spark</module>
   </modules>
 
   <repositories>
diff --git a/build.xml b/build.xml
index 43845ce..177e5c1 100644
--- a/build.xml
+++ b/build.xml
@@ -25,6 +25,8 @@
       <param name="module" value="asterix-tools"/>
       <param name="module" value="asterix-common"/>
       <param name="module" value="asterix-transactions"/>
+      <param name="module" value="asterix-schema"/>
+      <param name="module" value="asterix-schema-spark"/>
     </antcall>
   </target>
 
@@ -42,6 +44,8 @@
       <fileset dir="./asterix-common/${src.dir}"/>
       <fileset dir="./asterix-runtime/${src.dir}"/>
       <fileset dir="./asterix-transactions/${src.dir}"/>
+      <fileset dir="./asterix-schema/${src.dir}"/>
+      <fileset dir="./asterix-schema-spark/${src.dir}"/>
     </cobertura-report>
   </target>
 
@@ -77,6 +81,12 @@
       <fileset dir="./asterix-transactions/target/classes">
         <include name="**/*.class"/>
       </fileset>
+      <fileset dir="./asterix-schema/target/classes">
+        <include name="**/*.class"/>
+      </fileset>
+      <fileset dir="./asterix-schema-spark/target/classes">
+        <include name="**/*.class"/>
+      </fileset>
     </cobertura-instrument>
   </target>
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1003
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia6077216ba457a182e8034ed47536fc5f4dcb639
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Wail Alkowaileet <wael.y.k@gmail.com>

Mime
View raw message