hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1508202 [6/48] - in /hive/branches/tez: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/metrics/ common/src/java/org/apache/hadoop/hive/conf/ common/src/te...
Date Mon, 29 Jul 2013 21:08:19 GMT
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Mon Jul 29 21:08:03 2013
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -102,10 +103,8 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
-import org.apache.hadoop.hive.ql.metadata.formatting.TextMetaDataFormatter;
 import org.apache.hadoop.hive.ql.parse.AlterTablePartMergeFilesDesc;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -205,14 +204,7 @@ public class DDLTask extends Task<DDLWor
 
     // Pick the formatter to use to display the results.  Either the
     // normal human readable output or a json object.
-    if ("json".equals(conf.get(
-            HiveConf.ConfVars.HIVE_DDL_OUTPUT_FORMAT.varname, "text"))) {
-      formatter = new JsonMetaDataFormatter();
-    } else {
-      formatter = new TextMetaDataFormatter(
-                      conf.getIntVar(HiveConf.ConfVars.CLIPRETTYOUTPUTNUMCOLS));
-    }
-
+    formatter = MetaDataFormatUtils.getFormatter(conf);
     INTERMEDIATE_ARCHIVED_DIR_SUFFIX =
       HiveConf.getVar(conf, ConfVars.METASTORE_INT_ARCHIVED);
     INTERMEDIATE_ORIGINAL_DIR_SUFFIX =
@@ -437,32 +429,10 @@ public class DDLTask extends Task<DDLWor
       if (alterTableExchangePartition != null) {
         return exchangeTablePartition(db, alterTableExchangePartition);
       }
-
-    } catch (InvalidTableException e) {
-      formatter.consoleError(console, "Table " + e.getTableName() + " does not exist",
-                             formatter.MISSING);
-      LOG.debug(stringifyException(e));
-      return 1;
-    } catch (AlreadyExistsException e) {
-      formatter.consoleError(console, e.getMessage(), formatter.CONFLICT);
-      return 1;
-    } catch (NoSuchObjectException e) {
-      formatter.consoleError(console, e.getMessage(),
-                             "\n" + stringifyException(e),
-                             formatter.MISSING);
-      return 1;
-    } catch (HiveException e) {
-      formatter.consoleError(console,
-                             "FAILED: Error in metadata: " + e.getMessage(),
-                             "\n" + stringifyException(e),
-                             formatter.ERROR);
-      LOG.debug(stringifyException(e));
+    } catch (Throwable e) {
+      setException(e);
+      LOG.error(stringifyException(e));
       return 1;
-    } catch (Exception e) {
-      formatter.consoleError(console, "Failed with exception " + e.getMessage(),
-                             "\n" + stringifyException(e),
-                             formatter.ERROR);
-      return (1);
     }
     assert false;
     return 0;
@@ -538,6 +508,7 @@ public class DDLTask extends Task<DDLWor
             null, null, null, null);
         if (users != null && users.size() > 0) {
           boolean first = true;
+          sortPrivileges(users);
           for (HiveObjectPrivilege usr : users) {
             if (!first) {
               outStream.write(terminator);
@@ -593,6 +564,7 @@ public class DDLTask extends Task<DDLWor
               principalDesc.getType(), dbName, null, null, null);
           if (dbs != null && dbs.size() > 0) {
             boolean first = true;
+            sortPrivileges(dbs);
             for (HiveObjectPrivilege db : dbs) {
               if (!first) {
                 outStream.write(terminator);
@@ -616,6 +588,7 @@ public class DDLTask extends Task<DDLWor
                   columnName);
               if (columnss != null && columnss.size() > 0) {
                 boolean first = true;
+                sortPrivileges(columnss);
                 for (HiveObjectPrivilege col : columnss) {
                   if (!first) {
                     outStream.write(terminator);
@@ -636,6 +609,7 @@ public class DDLTask extends Task<DDLWor
                     .getType(), dbName, tableName, partValues, null);
             if (parts != null && parts.size() > 0) {
               boolean first = true;
+              sortPrivileges(parts);
               for (HiveObjectPrivilege part : parts) {
                 if (!first) {
                   outStream.write(terminator);
@@ -655,6 +629,7 @@ public class DDLTask extends Task<DDLWor
                 dbName, tableName, null, null);
             if (tbls != null && tbls.size() > 0) {
               boolean first = true;
+              sortPrivileges(tbls);
               for (HiveObjectPrivilege tbl : tbls) {
                 if (!first) {
                   outStream.write(terminator);
@@ -687,6 +662,18 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private static void sortPrivileges(List<HiveObjectPrivilege> privileges) {
+    Collections.sort(privileges, new Comparator<HiveObjectPrivilege>() {
+
+      @Override
+      public int compare(HiveObjectPrivilege one, HiveObjectPrivilege other) {
+        return one.getGrantInfo().getPrivilege().compareTo(other.getGrantInfo().getPrivilege());
+      }
+
+    });
+
+  }
+
   private int grantOrRevokePrivileges(List<PrincipalDesc> principals,
       List<PrivilegeDesc> privileges, PrivilegeObjectDesc privSubjectDesc,
       String grantor, PrincipalType grantorType, boolean grantOption, boolean isGrant) {
@@ -881,7 +868,7 @@ public class DDLTask extends Task<DDLWor
       }
       db.alterDatabase(database.getName(), database);
     } else {
-      throw new HiveException("ERROR: The database " + dbName + " does not exist.");
+      throw new HiveException(ErrorMsg.DATABASE_NOT_EXISTS, dbName);
     }
     return 0;
   }
@@ -1000,16 +987,6 @@ public class DDLTask extends Task<DDLWor
    * @return Returns 0 when execution succeeds and above 0 if it fails.
    * @throws HiveException
    */
-  /**
-   * Add a partition to a table.
-   *
-   * @param db
-   *          Database to add the partition to.
-   * @param addPartitionDesc
-   *          Add this partition.
-   * @return Returns 0 when execution succeeds and above 0 if it fails.
-   * @throws HiveException
-   */
   private int addPartition(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException {
 
     Table tbl = db.getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName());
@@ -1118,7 +1095,7 @@ public class DDLTask extends Task<DDLWor
     try {
       db.alterTable(tabName, tbl);
     } catch (InvalidOperationException e) {
-      throw new HiveException("Uable to update table");
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "Unable to alter " + tabName);
     }
 
     work.getInputs().add(new ReadEntity(tbl));
@@ -1878,10 +1855,7 @@ public class DDLTask extends Task<DDLWor
     tbl = db.getTable(tabName);
 
     if (!tbl.isPartitioned()) {
-      formatter.consoleError(console,
-                             "Table " + tabName + " is not a partitioned table",
-                             formatter.ERROR);
-      return 1;
+      throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED, tabName);
     }
     if (showParts.getPartSpec() != null) {
       parts = db.getPartitionNames(tbl.getDbName(),
@@ -1899,20 +1873,12 @@ public class DDLTask extends Task<DDLWor
 
       formatter.showTablePartitons(outStream, parts);
 
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
-    } catch (FileNotFoundException e) {
-        formatter.logWarn(outStream, "show partitions: " + stringifyException(e),
-                          MetaDataFormatter.ERROR);
-        return 1;
-      } catch (IOException e) {
-        formatter.logWarn(outStream, "show partitions: " + stringifyException(e),
-                          MetaDataFormatter.ERROR);
-        return 1;
     } catch (Exception e) {
-      throw new HiveException(e);
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "show partitions for table " + tabName);
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
 
     return 0;
@@ -2212,7 +2178,7 @@ public class DDLTask extends Task<DDLWor
   /**
    * Write a list of the available databases to a file.
    *
-   * @param showDatabases
+   * @param showDatabasesDesc
    *          These are the databases we're interested in.
    * @return Returns 0 when execution succeeds and above 0 if it fails.
    * @throws HiveException
@@ -2237,20 +2203,12 @@ public class DDLTask extends Task<DDLWor
       outStream = fs.create(resFile);
 
       formatter.showDatabases(outStream, databases);
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
-    } catch (FileNotFoundException e) {
-      formatter.logWarn(outStream, "show databases: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
-    } catch (IOException e) {
-      formatter.logWarn(outStream, "show databases: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (Exception e) {
-      throw new HiveException(e.toString());
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "show databases");
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
     return 0;
   }
@@ -2272,8 +2230,7 @@ public class DDLTask extends Task<DDLWor
     String dbName = showTbls.getDbName();
 
     if (!db.databaseExists(dbName)) {
-      throw new HiveException("ERROR: The database " + dbName + " does not exist.");
-
+      throw new HiveException(ErrorMsg.DATABASE_NOT_EXISTS, dbName);
     }
     if (showTbls.getPattern() != null) {
       LOG.info("pattern: " + showTbls.getPattern());
@@ -2292,20 +2249,12 @@ public class DDLTask extends Task<DDLWor
 
       SortedSet<String> sortedTbls = new TreeSet<String>(tbls);
       formatter.showTables(outStream, sortedTbls);
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
-    } catch (FileNotFoundException e) {
-      formatter.logWarn(outStream, "show table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
-    } catch (IOException e) {
-      formatter.logWarn(outStream, "show table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (Exception e) {
-      throw new HiveException(e.toString());
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "in database" + dbName);
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
     return 0;
   }
@@ -2337,8 +2286,7 @@ public class DDLTask extends Task<DDLWor
       ((FSDataOutputStream) outStream).close();
       outStream = null;
     } catch (IOException e) {
-      LOG.warn("show columns: " + stringifyException(e));
-      return 1;
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
     } finally {
       IOUtils.closeStream((FSDataOutputStream) outStream);
     }
@@ -2668,9 +2616,7 @@ public class DDLTask extends Task<DDLWor
       Database database = db.getDatabase(descDatabase.getDatabaseName());
 
       if (database == null) {
-          formatter.error(outStream,
-                          "No such database: " + descDatabase.getDatabaseName(),
-                          formatter.MISSING);
+        throw new HiveException(ErrorMsg.DATABASE_NOT_EXISTS, descDatabase.getDatabaseName());
       } else {
           Map<String, String> params = null;
           if(descDatabase.isExt()) {
@@ -2683,22 +2629,12 @@ public class DDLTask extends Task<DDLWor
                                             database.getLocationUri(),
                                             params);
       }
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
-    } catch (FileNotFoundException e) {
-      formatter.logWarn(outStream,
-                        "describe database: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (IOException e) {
-      formatter.logWarn(outStream,
-                        "describe database: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
-    } catch (Exception e) {
-      throw new HiveException(e.toString());
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
     return 0;
   }
@@ -2713,7 +2649,7 @@ public class DDLTask extends Task<DDLWor
    * @return Return 0 when execution succeeds and above 0 if it fails.
    */
   private int showTableStatus(Hive db, ShowTableStatusDesc showTblStatus) throws HiveException {
-    // get the tables for the desired pattenn - populate the output stream
+    // get the tables for the desired pattern - populate the output stream
     List<Table> tbls = new ArrayList<Table>();
     Map<String, String> part = showTblStatus.getPartSpec();
     Partition par = null;
@@ -2749,20 +2685,12 @@ public class DDLTask extends Task<DDLWor
 
       formatter.showTableStatus(outStream, db, conf, tbls, part, par);
 
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
-    } catch (FileNotFoundException e) {
-      formatter.logInfo(outStream, "show table status: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
-    } catch (IOException e) {
-      formatter.logInfo(outStream, "show table status: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (Exception e) {
-      throw new HiveException(e);
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "show table status");
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
     return 0;
   }
@@ -2835,7 +2763,6 @@ public class DDLTask extends Task<DDLWor
 
     return 0;
   }
-
   /**
    * Write the description of a table to a file.
    *
@@ -2860,36 +2787,26 @@ public class DDLTask extends Task<DDLWor
       if (tbl == null) {
         FileSystem fs = resFile.getFileSystem(conf);
         outStream = fs.create(resFile);
-        String errMsg = "Table " + tableName + " does not exist";
-        formatter.error(outStream, errMsg, formatter.MISSING);
-        ((FSDataOutputStream) outStream).close();
+        outStream.close();
         outStream = null;
-        return 0;
+        throw new HiveException(ErrorMsg.INVALID_TABLE, tableName);
       }
       if (descTbl.getPartSpec() != null) {
         part = db.getPartition(tbl, descTbl.getPartSpec(), false);
         if (part == null) {
           FileSystem fs = resFile.getFileSystem(conf);
           outStream = fs.create(resFile);
-          String errMsg = "Partition " + descTbl.getPartSpec() + " for table "
-              + tableName + " does not exist";
-          formatter.error(outStream, errMsg, formatter.MISSING);
-          ((FSDataOutputStream) outStream).close();
+          outStream.close();
           outStream = null;
-          return 0;
+          throw new HiveException(ErrorMsg.INVALID_PARTITION,
+                  StringUtils.join(descTbl.getPartSpec().keySet(), ','), tableName);
         }
         tbl = part.getTable();
       }
-    } catch (FileNotFoundException e) {
-      formatter.logInfo(outStream, "describe table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (IOException e) {
-      formatter.logInfo(outStream, "describe table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
 
     try {
@@ -2916,21 +2833,13 @@ public class DDLTask extends Task<DDLWor
                               descTbl.isFormatted(), descTbl.isExt(), descTbl.isPretty());
 
       LOG.info("DDLTask: written data for " + tbl.getTableName());
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
 
-    } catch (FileNotFoundException e) {
-      formatter.logInfo(outStream, "describe table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
     } catch (IOException e) {
-      formatter.logInfo(outStream, "describe table: " + stringifyException(e),
-                        formatter.ERROR);
-      return 1;
-    } catch (Exception e) {
-      throw new HiveException(e);
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
     } finally {
-      IOUtils.closeStream((FSDataOutputStream) outStream);
+      IOUtils.closeStream(outStream);
     }
 
     return 0;
@@ -3009,11 +2918,8 @@ public class DDLTask extends Task<DDLWor
       if (alterTbl.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) {
         part = db.getPartition(tbl, alterTbl.getPartSpec(), false);
         if (part == null) {
-          formatter.consoleError(console,
-                                 "Partition : " + alterTbl.getPartSpec().toString()
-                                 + " does not exist.",
-                                 formatter.MISSING);
-          return 1;
+          throw new HiveException(ErrorMsg.INVALID_PARTITION,
+                  StringUtils.join(alterTbl.getPartSpec().keySet(), ',') + " for table " + alterTbl.getOldName());
         }
       }
       else {
@@ -3044,10 +2950,7 @@ public class DDLTask extends Task<DDLWor
           while (iterOldCols.hasNext()) {
             String oldColName = iterOldCols.next().getName();
             if (oldColName.equalsIgnoreCase(newColName)) {
-              formatter.consoleError(console,
-                                     "Column '" + newColName + "' exists",
-                                     formatter.CONFLICT);
-              return 1;
+              throw new HiveException(ErrorMsg.DUPLICATE_COLUMN_NAMES, newColName);
             }
           }
           oldCols.add(newCol);
@@ -3078,10 +2981,7 @@ public class DDLTask extends Task<DDLWor
         String oldColName = col.getName();
         if (oldColName.equalsIgnoreCase(newName)
             && !oldColName.equalsIgnoreCase(oldName)) {
-          formatter.consoleError(console,
-                                 "Column '" + newName + "' exists",
-                                 formatter.CONFLICT);
-          return 1;
+          throw new HiveException(ErrorMsg.DUPLICATE_COLUMN_NAMES, newName);
         } else if (oldColName.equalsIgnoreCase(oldName)) {
           col.setName(newName);
           if (type != null && !type.trim().equals("")) {
@@ -3108,17 +3008,11 @@ public class DDLTask extends Task<DDLWor
 
       // did not find the column
       if (!found) {
-        formatter.consoleError(console,
-                               "Column '" + oldName + "' does not exists",
-                               formatter.MISSING);
-        return 1;
+        throw new HiveException(ErrorMsg.INVALID_COLUMN, oldName);
       }
       // after column is not null, but we did not find it.
       if ((afterCol != null && !afterCol.trim().equals("")) && position < 0) {
-        formatter.consoleError(console,
-                               "Column '" + afterCol + "' does not exists",
-                               formatter.MISSING);
-        return 1;
+        throw new HiveException(ErrorMsg.INVALID_COLUMN, afterCol);
       }
 
       if (position >= 0) {
@@ -3139,11 +3033,7 @@ public class DDLTask extends Task<DDLWor
           && !tbl.getSerializationLib().equals(LazySimpleSerDe.class.getName())
           && !tbl.getSerializationLib().equals(ColumnarSerDe.class.getName())
           && !tbl.getSerializationLib().equals(DynamicSerDe.class.getName())) {
-        formatter.consoleError(console,
-                               "Replace columns is not supported for this table. "
-                               + "SerDe may be incompatible.",
-                               formatter.ERROR);
-        return 1;
+        throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName());
       }
       tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) {
@@ -3242,10 +3132,7 @@ public class DDLTask extends Task<DDLWor
         URI locUri = new URI(newLocation);
         if (!locUri.isAbsolute() || locUri.getScheme() == null
             || locUri.getScheme().trim().equals("")) {
-          throw new HiveException(
-              newLocation
-                  + " is not absolute or has no scheme information. "
-                  + "Please specify a complete absolute uri with scheme information.");
+          throw new HiveException(ErrorMsg.BAD_LOCATION_VALUE, newLocation);
         }
         if (part != null) {
           part.setLocation(newLocation);
@@ -3313,34 +3200,18 @@ public class DDLTask extends Task<DDLWor
         tbl.setNumBuckets(alterTbl.getNumberBuckets());
       }
     } else {
-      formatter.consoleError(console,
-                             "Unsupported Alter commnad",
-                             formatter.ERROR);
-      return 1;
+      throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString());
     }
 
     if (part == null && allPartitions == null) {
-      if (!updateModifiedParameters(tbl.getTTable().getParameters(), conf)) {
-        return 1;
-      }
-      try {
-        tbl.checkValidity();
-      } catch (HiveException e) {
-        formatter.consoleError(console,
-                               "Invalid table columns : " + e.getMessage(),
-                               formatter.ERROR);
-        return 1;
-      }
+      updateModifiedParameters(tbl.getTTable().getParameters(), conf);
+      tbl.checkValidity();
     } else if (part != null) {
-      if (!updateModifiedParameters(part.getParameters(), conf)) {
-        return 1;
-      }
+      updateModifiedParameters(part.getParameters(), conf);
     }
     else {
       for (Partition tmpPart: allPartitions) {
-        if (!updateModifiedParameters(tmpPart.getParameters(), conf)) {
-          return 1;
-        }
+        updateModifiedParameters(tmpPart.getParameters(), conf);
       }
     }
 
@@ -3354,11 +3225,8 @@ public class DDLTask extends Task<DDLWor
         db.alterPartitions(tbl.getTableName(), allPartitions);
       }
     } catch (InvalidOperationException e) {
-      console.printError("Invalid alter operation: " + e.getMessage());
       LOG.info("alter table: " + stringifyException(e));
-      return 1;
-    } catch (HiveException e) {
-      return 1;
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
     }
 
     // This is kind of hacky - the read entity contains the old table, whereas
@@ -3525,16 +3393,12 @@ public class DDLTask extends Task<DDLWor
    * @param user
    *          user that is doing the updating.
    */
-  private boolean updateModifiedParameters(Map<String, String> params, HiveConf conf) {
+  private boolean updateModifiedParameters(Map<String, String> params, HiveConf conf) throws HiveException {
     String user = null;
     try {
       user = conf.getUser();
     } catch (IOException e) {
-      formatter.consoleError(console,
-                             "Unable to get current user: " + e.getMessage(),
-                             stringifyException(e),
-                             formatter.ERROR);
-      return false;
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "Unable to get current user");
     }
 
     params.put("last_modified_by", user);
@@ -3562,17 +3426,21 @@ public class DDLTask extends Task<DDLWor
    * @param crtDb
    * @return Always returns 0
    * @throws HiveException
-   * @throws AlreadyExistsException
    */
   private int createDatabase(Hive db, CreateDatabaseDesc crtDb)
-      throws HiveException, AlreadyExistsException {
+      throws HiveException {
     Database database = new Database();
     database.setName(crtDb.getName());
     database.setDescription(crtDb.getComment());
     database.setLocationUri(crtDb.getLocationUri());
     database.setParameters(crtDb.getDatabaseProperties());
-
-    db.createDatabase(database, crtDb.getIfNotExists());
+    try {
+      db.createDatabase(database, crtDb.getIfNotExists());
+    }
+    catch (AlreadyExistsException ex) {
+      //it would be better if AlreadyExistsException had an errorCode field....
+      throw new HiveException(ex, ErrorMsg.DATABSAE_ALREADY_EXISTS, crtDb.getName());
+    }
     return 0;
   }
 
@@ -3582,11 +3450,15 @@ public class DDLTask extends Task<DDLWor
    * @param dropDb
    * @return Always returns 0
    * @throws HiveException
-   * @throws NoSuchObjectException
    */
   private int dropDatabase(Hive db, DropDatabaseDesc dropDb)
-      throws HiveException, NoSuchObjectException {
-    db.dropDatabase(dropDb.getDatabaseName(), true, dropDb.getIfExists(), dropDb.isCasdade());
+      throws HiveException {
+    try {
+      db.dropDatabase(dropDb.getDatabaseName(), true, dropDb.getIfExists(), dropDb.isCasdade());
+    }
+    catch (NoSuchObjectException ex) {
+      throw new HiveException(ex, ErrorMsg.DATABASE_NOT_EXISTS, dropDb.getDatabaseName());
+    }
     return 0;
   }
 
@@ -3601,7 +3473,7 @@ public class DDLTask extends Task<DDLWor
       throws HiveException {
     String dbName = switchDb.getDatabaseName();
     if (!db.databaseExists(dbName)) {
-      throw new HiveException("ERROR: The database " + dbName + " does not exist.");
+      throw new HiveException(ErrorMsg.DATABASE_NOT_EXISTS, dbName);
     }
     db.setCurrentDatabase(dbName);
 
@@ -3988,7 +3860,7 @@ public class DDLTask extends Task<DDLWor
         fs.mkdirs(location);
       }
     } catch (Exception e) {
-      throw new HiveException(e);
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
     }
     return 0;
   }
@@ -4023,15 +3895,11 @@ public class DDLTask extends Task<DDLWor
     return locations;
   }
 
-  private int setGenericTableAttributes(Table tbl) {
+  private int setGenericTableAttributes(Table tbl) throws HiveException {
     try {
       tbl.setOwner(conf.getUser());
     } catch (IOException e) {
-      formatter.consoleError(console,
-                             "Unable to get current user: " + e.getMessage(),
-                             stringifyException(e),
-                             formatter.ERROR);
-      return 1;
+      throw new HiveException(e, ErrorMsg.GENERIC_ERROR, "Unable to get current user");
     }
     // set create time
     tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Mon Jul 29 21:08:03 2013
@@ -39,7 +39,6 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -60,6 +59,9 @@ import org.json.JSONObject;
 public class ExplainTask extends Task<ExplainWork> implements Serializable {
   private static final long serialVersionUID = 1L;
   public static final String EXPL_COLUMN_NAME = "Explain";
+  private Set<Operator<? extends OperatorDesc>> visitedOps = new HashSet<Operator<?>>();
+  private boolean isLogical = false;
+
   public ExplainTask() {
     super();
   }
@@ -109,7 +111,33 @@ public class ExplainTask extends Task<Ex
     return outJSONObject;
   }
 
-  static public JSONObject getJSONPlan(PrintStream out, ExplainWork work)
+  public JSONObject getJSONLogicalPlan(PrintStream out, ExplainWork work) throws Exception {
+    isLogical = true;
+
+    JSONObject outJSONObject = new JSONObject();
+    boolean jsonOutput = work.isFormatted();
+    if (jsonOutput) {
+      out = null;
+    }
+
+    if (work.getParseContext() != null) {
+      out.print("LOGICAL PLAN");
+      JSONObject jsonPlan = outputMap(work.getParseContext().getTopOps(), true,
+                                      out, jsonOutput, work.getExtended(), 0);
+      if (out != null) {
+        out.println();
+      }
+
+      if (jsonOutput) {
+        outJSONObject.put("LOGICAL PLAN", jsonPlan);
+      }
+    } else {
+      System.err.println("No parse context!");
+    }
+    return outJSONObject;
+  }
+
+  public JSONObject getJSONPlan(PrintStream out, ExplainWork work)
       throws Exception {
     // If the user asked for a formatted output, dump the json output
     // in the output stream
@@ -161,13 +189,20 @@ public class ExplainTask extends Task<Ex
       OutputStream outS = resFile.getFileSystem(conf).create(resFile);
       out = new PrintStream(outS);
 
-      if (work.getDependency()) {
-        JSONObject jsonDependencies = getJSONDependencies(work);
-        out.print(jsonDependencies);
-      } else {
-        JSONObject jsonPlan = getJSONPlan(out, work);
+      if (work.isLogical()) {
+        JSONObject jsonLogicalPlan = getJSONLogicalPlan(out, work);
         if (work.isFormatted()) {
-          out.print(jsonPlan);
+          out.print(jsonLogicalPlan);
+        }
+      } else {
+        if (work.getDependency()) {
+          JSONObject jsonDependencies = getJSONDependencies(work);
+          out.print(jsonDependencies);
+        } else {
+          JSONObject jsonPlan = getJSONPlan(out, work);
+          if (work.isFormatted()) {
+            out.print(jsonPlan);
+          }
         }
       }
 
@@ -185,7 +220,7 @@ public class ExplainTask extends Task<Ex
     }
   }
 
-  private static String indentString(int indent) {
+  private String indentString(int indent) {
     StringBuilder sb = new StringBuilder();
     for (int i = 0; i < indent; ++i) {
       sb.append(" ");
@@ -194,19 +229,16 @@ public class ExplainTask extends Task<Ex
     return sb.toString();
   }
 
-  private static JSONObject outputMap(Map<?, ?> mp, String header, PrintStream out,
+  private JSONObject outputMap(Map<?, ?> mp, boolean hasHeader, PrintStream out,
       boolean extended, boolean jsonOutput, int indent) throws Exception {
 
-    boolean first_el = true;
     TreeMap<Object, Object> tree = new TreeMap<Object, Object>();
     tree.putAll(mp);
     JSONObject json = jsonOutput ? new JSONObject() : null;
+    if (out != null && hasHeader && !mp.isEmpty()) {
+      out.println();
+    }
     for (Entry<?, ?> ent : tree.entrySet()) {
-      if (first_el && (out != null)) {
-        out.println(header);
-      }
-      first_el = false;
-
       // Print the key
       if (out != null) {
         out.print(indentString(indent));
@@ -252,7 +284,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? json : null;
   }
 
-  private static JSONArray outputList(List<?> l, String header, PrintStream out,
+  private JSONArray outputList(List<?> l, PrintStream out, boolean hasHeader,
       boolean extended, boolean jsonOutput, int indent) throws Exception {
 
     boolean first_el = true;
@@ -260,10 +292,6 @@ public class ExplainTask extends Task<Ex
     JSONArray outputArray = new JSONArray();
 
     for (Object o : l) {
-      if (first_el && (out != null)) {
-        out.print(header);
-      }
-
       if (isPrintable(o)) {
         String delim = first_el ? " " : ", ";
         if (out != null) {
@@ -277,11 +305,11 @@ public class ExplainTask extends Task<Ex
         nl = true;
       }
       else if (o instanceof Serializable) {
-        if (first_el && (out != null)) {
+        if (first_el && (out != null) && hasHeader) {
           out.println();
         }
         JSONObject jsonOut = outputPlan((Serializable) o, out, extended,
-            jsonOutput, jsonOutput ? 0 : indent + 2);
+            jsonOutput, jsonOutput ? 0 : (hasHeader ? indent + 2 : indent));
         if (jsonOutput) {
           outputArray.put(jsonOut);
         }
@@ -297,7 +325,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? outputArray : null;
   }
 
-  private static boolean isPrintable(Object val) {
+  private boolean isPrintable(Object val) {
     if (val instanceof Boolean || val instanceof String
         || val instanceof Integer || val instanceof Long || val instanceof Byte
         || val instanceof Float || val instanceof Double) {
@@ -311,8 +339,13 @@ public class ExplainTask extends Task<Ex
     return false;
   }
 
-  private static JSONObject outputPlan(Serializable work, PrintStream out,
-      boolean extended, boolean jsonOutput, int indent) throws Exception {
+  private JSONObject outputPlan(Serializable work,
+      PrintStream out, boolean extended, boolean jsonOutput, int indent) throws Exception {
+    return outputPlan(work, out, extended, jsonOutput, indent, "");
+  }
+
+  private JSONObject outputPlan(Serializable work, PrintStream out,
+      boolean extended, boolean jsonOutput, int indent, String appendToHeader) throws Exception {
     // Check if work has an explain annotation
     Annotation note = work.getClass().getAnnotation(Explain.class);
 
@@ -324,7 +357,11 @@ public class ExplainTask extends Task<Ex
         keyJSONObject = xpl_note.displayName();
         if (out != null) {
           out.print(indentString(indent));
-          out.println(xpl_note.displayName());
+          if (appendToHeader != null && !appendToHeader.isEmpty()) {
+            out.println(xpl_note.displayName() + appendToHeader);
+          } else {
+            out.println(xpl_note.displayName());
+          }
         }
       }
     }
@@ -336,18 +373,23 @@ public class ExplainTask extends Task<Ex
       Operator<? extends OperatorDesc> operator =
         (Operator<? extends OperatorDesc>) work;
       if (operator.getConf() != null) {
+        String appender = isLogical ? " (" + operator.getOperatorId() + ")" : "";
         JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
-            jsonOutput, jsonOutput ? 0 : indent);
+            jsonOutput, jsonOutput ? 0 : indent, appender);
         if (jsonOutput) {
           json.put(operator.getOperatorId(), jsonOut);
         }
       }
 
-      if (operator.getChildOperators() != null) {
-        for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) {
-          JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, jsonOutput ? 0 : indent + 2);
-          if (jsonOutput) {
-            json.put(operator.getOperatorId(), jsonOut);
+      if (!visitedOps.contains(operator) || !isLogical) {
+        visitedOps.add(operator);
+        if (operator.getChildOperators() != null) {
+          int cindent = jsonOutput ? 0 : indent + 2;
+          for (Operator<? extends OperatorDesc> op : operator.getChildOperators()) {
+            JSONObject jsonOut = outputPlan(op, out, extended, jsonOutput, cindent);
+            if (jsonOutput) {
+              json.put(operator.getOperatorId(), jsonOut);
+            }
           }
         }
       }
@@ -391,10 +433,14 @@ public class ExplainTask extends Task<Ex
           }
 
           String header = null;
+          boolean skipHeader = xpl_note.skipHeader();
+          boolean emptyHeader = false;
+
           if (!xpl_note.displayName().equals("")) {
             header = indentString(prop_indents) + xpl_note.displayName() + ":";
           }
           else {
+            emptyHeader = true;
             prop_indents = indent;
             header = indentString(prop_indents);
           }
@@ -402,7 +448,9 @@ public class ExplainTask extends Task<Ex
           // Try the output as a primitive object
           if (isPrintable(val)) {
             if (out != null && shouldPrint(xpl_note, val)) {
-              out.printf("%s ", header);
+              if (!skipHeader) {
+                out.printf("%s ", header);
+              }
               out.println(val);
             }
             if (jsonOutput) {
@@ -410,12 +458,26 @@ public class ExplainTask extends Task<Ex
             }
             continue;
           }
+
+          int ind = 0;
+          if (!jsonOutput) {
+            if (!skipHeader) {
+              ind = prop_indents + 2;
+            } else {
+              ind = indent;
+            }
+          }
+
           // Try this as a map
           try {
             // Go through the map and print out the stuff
             Map<?, ?> mp = (Map<?, ?>) val;
-            JSONObject jsonOut = outputMap(mp, header, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+
+            if (out != null && !skipHeader && mp != null && !mp.isEmpty()) {
+              out.print(header);
+            }
+
+            JSONObject jsonOut = outputMap(mp, !skipHeader && !emptyHeader, out, extended, jsonOutput, ind);
             if (jsonOutput) {
               json.put(header, jsonOut);
             }
@@ -428,8 +490,12 @@ public class ExplainTask extends Task<Ex
           // Try this as a list
           try {
             List<?> l = (List<?>) val;
-            JSONArray jsonOut = outputList(l, header, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+
+            if (out != null && !skipHeader && l != null && !l.isEmpty()) {
+              out.print(header);
+            }
+
+            JSONArray jsonOut = outputList(l, out, !skipHeader && !emptyHeader, extended, jsonOutput, ind);
 
             if (jsonOutput) {
               json.put(header, jsonOut);
@@ -444,11 +510,11 @@ public class ExplainTask extends Task<Ex
           // Finally check if it is serializable
           try {
             Serializable s = (Serializable) val;
-            if (out != null) {
+
+            if (!skipHeader && out != null) {
               out.println(header);
             }
-            JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput,
-                jsonOutput ? 0 : prop_indents + 2);
+            JSONObject jsonOut = outputPlan(s, out, extended, jsonOutput, ind);
             if (jsonOutput) {
               json.put(header, jsonOut);
             }
@@ -483,14 +549,14 @@ public class ExplainTask extends Task<Ex
    * @param val
    * @return
    */
-  private static boolean shouldPrint(Explain exp, Object val) {
+  private boolean shouldPrint(Explain exp, Object val) {
     if (exp.displayOnlyOnTrue() && (val instanceof Boolean) & !((Boolean)val)) {
       return false;
     }
     return true;
   }
 
-  private static JSONObject outputPlan(Task<? extends Serializable> task,
+  private JSONObject outputPlan(Task<? extends Serializable> task,
       PrintStream out, JSONObject parentJSON, boolean extended,
       boolean jsonOutput, HashSet<Task<? extends Serializable>> displayedSet,
       int indent) throws Exception {
@@ -534,7 +600,7 @@ public class ExplainTask extends Task<Ex
     return null;
   }
 
-  private static JSONObject outputDependencies(Task<? extends Serializable> task,
+  private JSONObject outputDependencies(Task<? extends Serializable> task,
       Set<Task<? extends Serializable>> dependeciesTaskSet, PrintStream out,
       JSONObject parentJson, boolean jsonOutput, int indent,
       boolean rootTskCandidate) throws Exception {
@@ -640,7 +706,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? json : null;
   }
 
-  public static String outputAST(String treeString, PrintStream out,
+  public String outputAST(String treeString, PrintStream out,
       boolean jsonOutput, int indent) throws JSONException {
     if (out != null) {
       out.print(indentString(indent));
@@ -652,7 +718,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? treeString : null;
   }
 
-  public static JSONObject outputDependencies(PrintStream out, boolean jsonOutput,
+  public JSONObject outputDependencies(PrintStream out, boolean jsonOutput,
       List<Task<? extends Serializable>> rootTasks, int indent)
       throws Exception {
     if (out != null) {
@@ -676,7 +742,7 @@ public class ExplainTask extends Task<Ex
     return jsonOutput ? json : null;
   }
 
-  public static JSONObject outputStagePlans(PrintStream out, ExplainWork work,
+  public JSONObject outputStagePlans(PrintStream out, ExplainWork work,
       List<Task<? extends Serializable>> rootTasks, int indent)
       throws Exception {
     boolean jsonOutput = work.isFormatted();
@@ -698,10 +764,8 @@ public class ExplainTask extends Task<Ex
    * MethodComparator.
    *
    */
-  public static class MethodComparator implements Comparator {
-    public int compare(Object o1, Object o2) {
-      Method m1 = (Method) o1;
-      Method m2 = (Method) o2;
+  public class MethodComparator implements Comparator<Method> {
+    public int compare(Method m1, Method m2) {
       return m1.getName().compareTo(m2.getName());
     }
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Mon Jul 29 21:08:03 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveRecordReader;
@@ -487,11 +488,15 @@ public class FetchOperator implements Se
   public boolean pushRow() throws IOException, HiveException {
     InspectableObject row = getNextRow();
     if (row != null) {
-      operator.process(row.o, 0);
+      pushRow(row);
     }
     return row != null;
   }
 
+  protected void pushRow(InspectableObject row) throws HiveException {
+    operator.process(row.o, 0);
+  }
+
   private transient final InspectableObject inspectable = new InspectableObject();
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Mon Jul 29 21:08:03 2013
@@ -65,7 +65,7 @@ public class FetchTask extends Task<Fetc
 
     try {
       // Create a file system handle
-      JobConf job = new JobConf(conf, ExecDriver.class);
+      JobConf job = new JobConf(conf);
 
       Operator<?> source = work.getSource();
       if (source instanceof TableScanOperator) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Jul 29 21:08:03 2013
@@ -338,7 +338,7 @@ public class FileSinkOperator extends Te
         jc = (JobConf) hconf;
       } else {
         // test code path
-        jc = new JobConf(hconf, ExecDriver.class);
+        jc = new JobConf(hconf);
       }
 
       if (multiFileSpray) {
@@ -808,7 +808,7 @@ public class FileSinkOperator extends Te
   private String lsDir() {
     String specPath = conf.getDirName();
     // need to get a JobConf here because it's not passed through at client side
-    JobConf jobConf = new JobConf(ExecDriver.class);
+    JobConf jobConf = new JobConf();
     Path tmpPath = Utilities.toTempPath(specPath);
     StringBuilder sb = new StringBuilder("\n");
     try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Mon Jul 29 21:08:03 2013
@@ -343,6 +343,8 @@ public final class FunctionRegistry {
     registerUDF(serdeConstants.STRING_TYPE_NAME, UDFToString.class, false,
         UDFToString.class.getSimpleName());
 
+    registerGenericUDF(serdeConstants.DATE_TYPE_NAME,
+        GenericUDFToDate.class);
     registerGenericUDF(serdeConstants.TIMESTAMP_TYPE_NAME,
         GenericUDFTimestamp.class);
     registerGenericUDF(serdeConstants.BINARY_TYPE_NAME,
@@ -707,6 +709,11 @@ public final class FunctionRegistry {
     if (from.equals(TypeInfoFactory.voidTypeInfo)) {
       return true;
     }
+    // Allow implicit String to Date conversion
+    if (from.equals(TypeInfoFactory.dateTypeInfo)
+        && to.equals(TypeInfoFactory.stringTypeInfo)) {
+      return true;
+    }
 
     if (from.equals(TypeInfoFactory.timestampTypeInfo)
         && to.equals(TypeInfoFactory.stringTypeInfo)) {
@@ -1267,7 +1274,8 @@ public final class FunctionRegistry {
         udfClass == UDFToDouble.class || udfClass == UDFToFloat.class ||
         udfClass == UDFToInteger.class || udfClass == UDFToLong.class ||
         udfClass == UDFToShort.class || udfClass == UDFToString.class ||
-        udfClass == GenericUDFTimestamp.class || udfClass == GenericUDFToBinary.class;
+        udfClass == GenericUDFTimestamp.class || udfClass == GenericUDFToBinary.class ||
+        udfClass == GenericUDFToDate.class;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Mon Jul 29 21:08:03 2013
@@ -707,6 +707,7 @@ public class GroupByOperator extends Ope
   @Override
   public void startGroup() throws HiveException {
     firstRowInGroup = true;
+    super.startGroup();
   }
 
   @Override
@@ -750,7 +751,7 @@ public class GroupByOperator extends Ope
               + " #total = " + numRowsInput + " reduction = " + 1.0
               * (numRowsHashTbl / numRowsInput) + " minReduction = "
               + minReductionHashAggr);
-          flush(true);
+          flushHashTable(true);
           hashAggr = false;
         } else {
           LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
@@ -835,7 +836,7 @@ public class GroupByOperator extends Ope
     // happen at boundaries
     if ((!groupKeyIsNotReduceKey || firstRowInGroup)
         && shouldBeFlushed(newKeys)) {
-      flush(false);
+      flushHashTable(false);
     }
   }
 
@@ -983,7 +984,12 @@ public class GroupByOperator extends Ope
     return length;
   }
 
-  private void flush(boolean complete) throws HiveException {
+  /**
+   * Flush hash table. This method is used by hash-based aggregations
+   * @param complete
+   * @throws HiveException
+   */
+  private void flushHashTable(boolean complete) throws HiveException {
 
     countAfterReport = 0;
 
@@ -1048,6 +1054,42 @@ public class GroupByOperator extends Ope
   }
 
   /**
+   * Forward all aggregations to children. It is only used by DemuxOperator.
+   * @throws HiveException
+   */
+  @Override
+  public void flush() throws HiveException{
+    try {
+      if (hashAggregations != null) {
+        LOG.info("Begin Hash Table flush: size = "
+            + hashAggregations.size());
+        Iterator iter = hashAggregations.entrySet().iterator();
+        while (iter.hasNext()) {
+          Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry) iter
+              .next();
+
+          forward(m.getKey().getKeyArray(), m.getValue());
+          iter.remove();
+        }
+        hashAggregations.clear();
+      } else if (aggregations != null) {
+        // sort-based aggregations
+        if (currentKeys != null) {
+          forward(currentKeys.getKeyArray(), aggregations);
+        }
+        currentKeys = null;
+      } else {
+        // The GroupByOperator is not initialized, which means there is no
+        // data
+        // (since we initialize the operators when we see the first record).
+        // Just do nothing here.
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  /**
    * We need to forward all the aggregations to children.
    *
    */
@@ -1088,33 +1130,9 @@ public class GroupByOperator extends Ope
           // create dummy keys - size 0
           forward(new Object[0], aggregations);
         } else {
-          if (hashAggregations != null) {
-            LOG.info("Begin Hash Table flush at close: size = "
-                + hashAggregations.size());
-            Iterator iter = hashAggregations.entrySet().iterator();
-            while (iter.hasNext()) {
-              Map.Entry<KeyWrapper, AggregationBuffer[]> m = (Map.Entry) iter
-                  .next();
-
-              forward(m.getKey().getKeyArray(), m.getValue());
-              iter.remove();
-            }
-            hashAggregations.clear();
-          } else if (aggregations != null) {
-            // sort-based aggregations
-            if (currentKeys != null) {
-              forward(currentKeys.getKeyArray(), aggregations);
-            }
-            currentKeys = null;
-          } else {
-            // The GroupByOperator is not initialized, which means there is no
-            // data
-            // (since we initialize the operators when we see the first record).
-            // Just do nothing here.
-          }
+          flush();
         }
       } catch (Exception e) {
-        e.printStackTrace();
         throw new HiveException(e);
       }
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Mon Jul 29 21:08:03 2013
@@ -87,14 +87,15 @@ public class JoinOperator extends Common
       }
 
       // number of rows for the key in the given table
-      int sz = storage[alias].size();
+      long sz = storage[alias].size();
       StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
       StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
           .toString());
       List keyObject = (List) soi.getStructFieldData(row, sf);
       // Are we consuming too much memory
-      if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0)) {
-        if (sz == joinEmitInterval) {
+      if (alias == numAliases - 1 && !(handleSkewJoin && skewJoinKeyContext.currBigKeyTag >= 0) &&
+          !hasLeftSemiJoin) {
+        if (sz == joinEmitInterval && !hasFilter(alias)) {
           // The input is sorted by alias, so if we are already in the last join
           // operand,
           // we can emit some results now.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Jul 29 21:08:03 2013
@@ -33,9 +33,10 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -62,7 +63,7 @@ import org.apache.hadoop.util.StringUtil
  * different from regular operators in that it starts off by processing a
  * Writable data structure from a Table (instead of a Hive Object).
  **/
-public class MapOperator extends Operator<MapredWork> implements Serializable, Cloneable {
+public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 1L;
 
@@ -228,24 +229,26 @@ public class MapOperator extends Operato
    * @param mrwork
    * @throws HiveException
    */
-  public void initializeAsRoot(Configuration hconf, MapredWork mrwork)
+  public void initializeAsRoot(Configuration hconf, MapWork mapWork)
       throws HiveException {
-    setConf(mrwork);
+    setConf(mapWork);
     setChildren(hconf);
     initialize(hconf, null);
   }
 
-  private MapOpCtx initObjectInspector(MapredWork conf,
+  private MapOpCtx initObjectInspector(MapWork conf,
       Configuration hconf, String onefile, Map<TableDesc, StructObjectInspector> convertedOI)
           throws HiveException,
       ClassNotFoundException, InstantiationException, IllegalAccessException,
       SerDeException {
     PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
     LinkedHashMap<String, String> partSpec = pd.getPartSpec();
-    // Use tblProps in case of unpartitioned tables
+    // Use table properties in case of unpartitioned tables,
+    // and the union of table properties and partition properties, with partition
+    // taking precedence
     Properties partProps =
         (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ?
-            pd.getTableDesc().getProperties() : pd.getProperties();
+            pd.getTableDesc().getProperties() : pd.getOverlayedProperties();
 
     Class serdeclass = pd.getDeserializerClass();
     if (serdeclass == null) {
@@ -409,7 +412,7 @@ public class MapOperator extends Operato
         // If the partition does not exist, use table properties
         Properties partProps =
             (pd.getPartSpec() == null || pd.getPartSpec().isEmpty()) ?
-                tblProps : pd.getProperties();
+                tblProps : pd.getOverlayedProperties();
 
         Class sdclass = pd.getDeserializerClass();
         if (sdclass == null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredContext.java Mon Jul 29 21:08:03 2013
@@ -40,17 +40,17 @@ public class MapredContext {
   private static final Log logger = LogFactory.getLog("MapredContext");
   private static final ThreadLocal<MapredContext> contexts = new ThreadLocal<MapredContext>();
 
-  static MapredContext get() {
+  public static MapredContext get() {
     return contexts.get();
   }
 
-  static MapredContext init(boolean isMap, JobConf jobConf) {
+  public static MapredContext init(boolean isMap, JobConf jobConf) {
     MapredContext context = new MapredContext(isMap, jobConf);
     contexts.set(context);
     return context;
   }
 
-  static void close() {
+  public static void close() {
     MapredContext context = contexts.get();
     if (context != null) {
       context.closeAll();
@@ -91,7 +91,7 @@ public class MapredContext {
     return jobConf;
   }
 
-  void setReporter(Reporter reporter) {
+  public void setReporter(Reporter reporter) {
     this.reporter = reporter;
   }
 
@@ -139,8 +139,8 @@ public class MapredContext {
     try {
       Method initMethod = func.getClass().getMethod("configure", MapredContext.class);
       return initMethod.getDeclaringClass() != GenericUDF.class &&
-          initMethod.getDeclaringClass() != GenericUDAFEvaluator.class &&
-          initMethod.getDeclaringClass() != GenericUDTF.class;
+        initMethod.getDeclaringClass() != GenericUDAFEvaluator.class &&
+        initMethod.getDeclaringClass() != GenericUDTF.class;
     } catch (Exception e) {
       return false;
     }
@@ -150,7 +150,7 @@ public class MapredContext {
     try {
       Method closeMethod = func.getClass().getMethod("close");
       return closeMethod.getDeclaringClass() != GenericUDF.class &&
-          closeMethod.getDeclaringClass() != GenericUDAFEvaluator.class;
+        closeMethod.getDeclaringClass() != GenericUDAFEvaluator.class;
     } catch (Exception e) {
       return false;
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Jul 29 21:08:03 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.Dy
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -304,9 +307,13 @@ public class MoveTask extends Task<MoveW
             // the directory this move task is moving
             if (task instanceof MapRedTask) {
               MapredWork work = (MapredWork)task.getWork();
-              bucketCols = work.getBucketedColsByDirectory().get(path);
-              sortCols = work.getSortedColsByDirectory().get(path);
-              numBuckets = work.getNumReduceTasks();
+              MapWork mapWork = work.getMapWork();
+              bucketCols = mapWork.getBucketedColsByDirectory().get(path);
+              sortCols = mapWork.getSortedColsByDirectory().get(path);
+              if (work.getReduceWork() != null) {
+                numBuckets = work.getReduceWork().getNumReduceTasks();
+              }
+
               if (bucketCols != null || sortCols != null) {
                 // This must be a final map reduce task (the task containing the file sink
                 // operator that writes the final output)

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Jul 29 21:08:03 2013
@@ -27,10 +27,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -76,7 +78,7 @@ public abstract class Operator<T extends
 
   private transient ExecMapperContext execContext;
 
-  private static int seqId;
+  private static AtomicInteger seqId;
 
   // It can be optimized later so that an operator operator (init/close) is performed
   // only after that operation has been performed on all the parents. This will require
@@ -103,17 +105,17 @@ public abstract class Operator<T extends
   // all operators
 
   static {
-    seqId = 0;
+    seqId = new AtomicInteger(0);
   }
 
   private boolean useBucketizedHiveInputFormat;
 
   public Operator() {
-    id = String.valueOf(seqId++);
+    id = String.valueOf(seqId.getAndIncrement());
   }
 
   public static void resetId() {
-    seqId = 0;
+    seqId.set(0);
   }
 
   /**
@@ -123,8 +125,8 @@ public abstract class Operator<T extends
    *          Used to report progress of certain operators.
    */
   public Operator(Reporter reporter) {
+    this();
     this.reporter = reporter;
-    id = String.valueOf(seqId++);
   }
 
   public void setChildOperators(
@@ -435,7 +437,7 @@ public abstract class Operator<T extends
    *          parent operator id
    * @throws HiveException
    */
-  private void initialize(Configuration hconf, ObjectInspector inputOI,
+  protected void initialize(Configuration hconf, ObjectInspector inputOI,
       int parentId) throws HiveException {
     LOG.info("Initializing child " + id + " " + getName());
     // Double the size of the array if needed
@@ -523,7 +525,7 @@ public abstract class Operator<T extends
     LOG.debug("Start group Done");
   }
 
-  // If a operator wants to do some work at the end of a group
+  // If an operator wants to do some work at the end of a group
   public void endGroup() throws HiveException {
     LOG.debug("Ending group");
 
@@ -543,6 +545,20 @@ public abstract class Operator<T extends
     LOG.debug("End group Done");
   }
 
+  // an blocking operator (e.g. GroupByOperator and JoinOperator) can
+  // override this method to forward its outputs
+  public void flush() throws HiveException {
+  }
+
+  public void processGroup(int tag) throws HiveException {
+    if (childOperators == null) {
+      return;
+    }
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      childOperatorsArray[i].processGroup(childOperatorsTag[i]);
+    }
+  }
+
   protected boolean allInitializedParentsAreClosed() {
     if (parentOperators != null) {
       for (Operator<? extends OperatorDesc> parent : parentOperators) {
@@ -779,7 +795,7 @@ public abstract class Operator<T extends
     parentOperators.set(parentIndex, newParent);
   }
 
-  private long getNextCntr(long cntr) {
+  protected long getNextCntr(long cntr) {
     // A very simple counter to keep track of number of rows processed by an
     // operator. It dumps
     // every 1 million times, and quickly before that
@@ -1480,6 +1496,7 @@ public abstract class Operator<T extends
     return true;
   }
 
+  @Override
   public String toString() {
     return getName() + "[" + getIdentifier() + "]";
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Jul 29 21:08:03 2013
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
+import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -98,6 +100,10 @@ public final class OperatorFactory {
         HashTableSinkOperator.class));
     opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class,
         DummyStoreOperator.class));
+    opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class,
+        DemuxOperator.class));
+    opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
+        MuxOperator.class));
   }
 
   public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
@@ -257,7 +263,7 @@ public final class OperatorFactory {
   public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf,
       RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, Operator... oplist) {
     Operator<T> ret = getAndMakeChild(conf, rwsch, oplist);
-    ret.setColumnExprMap(colExprMap);    
+    ret.setColumnExprMap(colExprMap);
     return (ret);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Mon Jul 29 21:08:03 2013
@@ -1,4 +1,9 @@
 /**
+ <<<<<<< HEAD
+ =======
+ * Copyright 2010 The Apache Software Foundation
+ *
+ >>>>>>> HIVE-1402 [jira] Add parallel ORDER BY to Hive
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,6 +33,11 @@ public class OperatorUtils {
     return findOperator(start, clazz, new HashSet<T>());
   }
 
+  public static <T> T findSingleOperator(Operator<?> start, Class<T> clazz) {
+    Set<T> found = findOperator(start, clazz, new HashSet<T>());
+    return found.size() == 1 ? found.iterator().next() : null;
+  }
+
   public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
     Set<T> found = new HashSet<T>();
     for (Operator<?> start : starts) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPersistence.java Mon Jul 29 21:08:03 2013
@@ -442,16 +442,20 @@ public class PTFPersistence {
     @Override
     protected void reset(int startOffset) throws HiveException {
       PTFPersistence.lock(lock.writeLock());
+      currentSize = 0;
       try {
-        currentSize = 0;
-        for(int i=0; i < partitions.size() - 1; i++) {
-          PersistentByteBasedList p = (PersistentByteBasedList)
-                          partitions.remove(0);
-          reusableFiles.add(p.getFile());
-          partitionOffsets.remove(0);
+        for (int i = 0; i < partitions.size() - 1; i++) {
+          ByteBasedList p = partitions.get(i);
+          reusableFiles.add(((PersistentByteBasedList)p).getFile());
         }
-        partitions.get(0).reset(0);
-        partitionOffsets.set(0, currentSize);
+        ByteBasedList memstore = partitions.get(partitions.size() - 1);
+        memstore.reset(0);
+
+        partitions.clear();
+        partitionOffsets.clear();
+
+        partitions.add(memstore);
+        partitionOffsets.add(0);
       }
       finally {
         lock.writeLock().unlock();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jul 29 21:08:03 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
@@ -532,22 +533,68 @@ public class ScriptOperator extends Oper
     }
   }
 
+  class CounterStatusProcessor {
+
+    private final String reporterPrefix;
+    private final String counterPrefix;
+    private final String statusPrefix;
+    private final Reporter reporter;
+
+    CounterStatusProcessor(Configuration hconf, Reporter reporter){
+      this.reporterPrefix = HiveConf.getVar(hconf, HiveConf.ConfVars.STREAMREPORTERPERFIX);
+      this.counterPrefix = reporterPrefix + "counter:";
+      this.statusPrefix = reporterPrefix + "status:";
+      this.reporter = reporter;
+    }
+
+    private boolean process(String line) {
+      if (line.startsWith(reporterPrefix)){
+        if (line.startsWith(counterPrefix)){
+          incrCounter(line);
+        }
+        if (line.startsWith(statusPrefix)){
+          setStatus(line);
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    private void incrCounter(String line) {
+      String  trimmedLine = line.substring(counterPrefix.length()).trim();
+      String[] columns = trimmedLine.split(",");
+      if (columns.length == 3) {
+        try {
+          reporter.incrCounter(columns[0], columns[1], Long.parseLong(columns[2]));
+        } catch (NumberFormatException e) {
+            LOG.warn("Cannot parse counter increment '" + columns[2] +
+                "' from line " + line);
+        }
+      } else {
+        LOG.warn("Cannot parse counter line: " + line);
+      }
+    }
+
+    private void setStatus(String line) {
+      reporter.setStatus(line.substring(statusPrefix.length()).trim());
+    }
+  }
   /**
    * The processor for stderr stream.
-   *
-   * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
-   * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
-   * counters and status updates.
    */
   class ErrorStreamProcessor implements StreamProcessor {
     private long bytesCopied = 0;
     private final long maxBytes;
-
     private long lastReportTime;
+    private CounterStatusProcessor counterStatus;
 
     public ErrorStreamProcessor(int maxBytes) {
       this.maxBytes = maxBytes;
       lastReportTime = 0;
+      if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.STREAMREPORTERENABLED)){
+        counterStatus = new CounterStatusProcessor(hconf, reporter);
+      }
     }
 
     public void processLine(Writable line) throws HiveException {
@@ -571,6 +618,14 @@ public class ScriptOperator extends Oper
         reporter.progress();
       }
 
+      if (reporter != null) {
+        if (counterStatus != null) {
+          if (counterStatus.process(stringLine)) {
+            return;
+          }
+        }
+      }
+
       if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
         System.err.println(stringLine);
       }
@@ -659,7 +714,7 @@ public class ScriptOperator extends Oper
     for (int i = 0; i < inArgs.length; i++) {
       finalArgv[wrapComponents.length + i] = inArgs[i];
     }
-    return (finalArgv);
+    return finalArgv;
   }
 
   // Code below shameless borrowed from Hadoop Streaming

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Mon Jul 29 21:08:03 2013
@@ -200,7 +200,7 @@ public class TableScanOperator extends O
       jc = (JobConf) hconf;
     } else {
       // test code path
-      jc = new JobConf(hconf, ExecDriver.class);
+      jc = new JobConf(hconf);
     }
 
     currentStat = null;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Mon Jul 29 21:08:03 2013
@@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.lib.Node;
@@ -50,6 +49,8 @@ import org.apache.hadoop.util.StringUtil
 public abstract class Task<T extends Serializable> implements Serializable, Node {
 
   private static final long serialVersionUID = 1L;
+  public transient HashMap<String, Long> taskCounters;
+  public transient TaskHandle taskHandle;
   protected transient boolean started;
   protected transient boolean initialized;
   protected transient boolean isdone;
@@ -58,8 +59,6 @@ public abstract class Task<T extends Ser
   protected transient Hive db;
   protected transient LogHelper console;
   protected transient QueryPlan queryPlan;
-  protected transient TaskHandle taskHandle;
-  protected transient HashMap<String, Long> taskCounters;
   protected transient DriverContext driverContext;
   protected transient boolean clonedConf = false;
   protected transient String jobID;
@@ -87,6 +86,7 @@ public abstract class Task<T extends Ser
 
   protected String id;
   protected T work;
+
   public static enum FeedType {
     DYNAMIC_PARTITIONS, // list of dynamic partitions
   };
@@ -95,6 +95,12 @@ public abstract class Task<T extends Ser
 
   protected List<Task<? extends Serializable>> childTasks;
   protected List<Task<? extends Serializable>> parentTasks;
+  /**
+   * this can be set by the Task, to provide more info about the failure in TaskResult
+   * where the Driver can find it.  This is checked if {@link Task#execute(org.apache.hadoop.hive.ql.DriverContext)}
+   * returns non-0 code.
+   */
+  private Throwable exception;
 
   public Task() {
     isdone = false;
@@ -494,4 +500,10 @@ public abstract class Task<T extends Ser
   public List<FieldSchema> getResultSchema() {
     return null;
   }
+  Throwable getException() {
+    return exception;
+  }
+  void setException(Throwable ex) {
+    exception = ex;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Mon Jul 29 21:08:03 2013
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
 import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java Mon Jul 29 21:08:03 2013
@@ -20,11 +20,13 @@ package org.apache.hadoop.hive.ql.exec;
 
 /**
  * TaskResult implementation.
+ * Note that different threads may be reading/writing this object
  **/
 
 public class TaskResult {
-  protected int exitVal;
-  protected boolean runStatus;
+  protected volatile int exitVal;
+  protected volatile boolean runStatus;
+  private volatile Throwable taskError;
 
   public TaskResult() {
     exitVal = -1;
@@ -35,11 +37,21 @@ public class TaskResult {
     this.exitVal = exitVal;
     setRunning(false);
   }
+  public void setExitVal(int exitVal, Throwable taskError) {
+    this.setExitVal(exitVal);
+    this.taskError = taskError;
+  }
 
   public int getExitVal() {
     return exitVal;
   }
 
+  /**
+   * @return may contain details of the error which caused the task to fail or null
+   */
+  public Throwable getTaskError() {
+    return taskError;
+  }
   public boolean isRunning() {
     return runStatus;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Mon Jul 29 21:08:03 2013
@@ -66,7 +66,7 @@ public class TaskRunner extends Thread {
     } catch (Throwable t) {
       t.printStackTrace();
     }
-    result.setExitVal(exitVal);
+    result.setExitVal(exitVal, tsk.getException());
   }
 
   public static long getTaskRunnerID () {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java?rev=1508202&r1=1508201&r2=1508202&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java Mon Jul 29 21:08:03 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.hive.ql.exec;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -72,6 +74,8 @@ public class UDFArgumentException extend
     StringBuilder sb = new StringBuilder();
     sb.append(message);
     if (methods != null) {
+      // Sort the methods before omitting them.
+      sortMethods(methods);
       sb.append(". Possible choices: ");
       for (Method m: methods) {
         Type[] types = m.getGenericParameterTypes();
@@ -89,6 +93,28 @@ public class UDFArgumentException extend
     return sb.toString();
   }
   
+  private static void sortMethods(List<Method> methods) {
+    Collections.sort( methods, new Comparator<Method>(){
+
+      @Override
+      public int compare(Method m1, Method m2) {
+        int result = m1.getName().compareTo(m2.getName());
+        if (result != 0)
+          return result;
+        Type[] types1 = m1.getGenericParameterTypes();
+        Type[] types2 = m2.getGenericParameterTypes();
+        for (int i = 0; i < types1.length && i < types2.length; i++) {
+          String type1 = ObjectInspectorUtils.getTypeNameFromJavaClass(types1[i]);
+          String type2 = ObjectInspectorUtils.getTypeNameFromJavaClass(types2[i]);
+          if ((result = type1.compareTo(type2)) != 0)
+            return result;
+        }
+        return types1.length - types2.length;
+      }
+
+    });
+  }
+  
   /**
    * The UDF or UDAF class that has the ambiguity.
    */



Mime
View raw message