hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1561947 [9/17] - in /hive/branches/tez: ./ ant/ ant/src/org/apache/hadoop/hive/ant/ beeline/ cli/ cli/src/java/org/apache/hadoop/hive/cli/ common/ common/src/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/h...
Date Tue, 28 Jan 2014 05:48:10 GMT
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Jan 28 05:48:03 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.hooks.Entity;
+import org.apache.hadoop.hive.ql.hooks.Entity.Type;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.Hook;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
@@ -101,6 +102,9 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
@@ -523,57 +527,62 @@ public class Driver implements CommandPr
     SessionState ss = SessionState.get();
     HiveOperation op = ss.getHiveOperation();
     Hive db = sem.getDb();
-    if (op != null) {
-      if (op.equals(HiveOperation.CREATEDATABASE)) {
-        ss.getAuthorizer().authorize(
-            op.getInputRequiredPrivileges(), op.getOutputRequiredPrivileges());
-      } else if (op.equals(HiveOperation.CREATETABLE_AS_SELECT)
-          || op.equals(HiveOperation.CREATETABLE)) {
-        ss.getAuthorizer().authorize(
-            db.getDatabase(SessionState.get().getCurrentDatabase()), null,
-            HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
-      } else {
-        if (op.equals(HiveOperation.IMPORT)) {
-          ImportSemanticAnalyzer isa = (ImportSemanticAnalyzer) sem;
-          if (!isa.existsTable()) {
-            ss.getAuthorizer().authorize(
-                db.getDatabase(SessionState.get().getCurrentDatabase()), null,
-                HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
-          }
+    if (ss.isAuthorizationModeV2()) {
+      doAuthorizationV2(ss, op, inputs, outputs);
+      return;
+    }
+
+    if (op == null) {
+      throw new HiveException("Operation should not be null");
+    }
+    if (op.equals(HiveOperation.CREATEDATABASE)) {
+      ss.getAuthorizer().authorize(
+          op.getInputRequiredPrivileges(), op.getOutputRequiredPrivileges());
+    } else if (op.equals(HiveOperation.CREATETABLE_AS_SELECT)
+        || op.equals(HiveOperation.CREATETABLE)) {
+      ss.getAuthorizer().authorize(
+          db.getDatabase(SessionState.get().getCurrentDatabase()), null,
+          HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
+    } else {
+      if (op.equals(HiveOperation.IMPORT)) {
+        ImportSemanticAnalyzer isa = (ImportSemanticAnalyzer) sem;
+        if (!isa.existsTable()) {
+          ss.getAuthorizer().authorize(
+              db.getDatabase(SessionState.get().getCurrentDatabase()), null,
+              HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges());
         }
       }
-      if (outputs != null && outputs.size() > 0) {
-        for (WriteEntity write : outputs) {
-          if (write.getType() == Entity.Type.DATABASE) {
-            ss.getAuthorizer().authorize(write.getDatabase(),
-                null, op.getOutputRequiredPrivileges());
-            continue;
-          }
-
-          if (write.getType() == WriteEntity.Type.PARTITION) {
-            Partition part = db.getPartition(write.getTable(), write
-                .getPartition().getSpec(), false);
-            if (part != null) {
-              ss.getAuthorizer().authorize(write.getPartition(), null,
-                      op.getOutputRequiredPrivileges());
-              continue;
-            }
-          }
+    }
+    if (outputs != null && outputs.size() > 0) {
+      for (WriteEntity write : outputs) {
+        if (write.getType() == Entity.Type.DATABASE) {
+          ss.getAuthorizer().authorize(write.getDatabase(),
+              null, op.getOutputRequiredPrivileges());
+          continue;
+        }
 
-          if (write.getTable() != null) {
-            ss.getAuthorizer().authorize(write.getTable(), null,
+        if (write.getType() == WriteEntity.Type.PARTITION) {
+          Partition part = db.getPartition(write.getTable(), write
+              .getPartition().getSpec(), false);
+          if (part != null) {
+            ss.getAuthorizer().authorize(write.getPartition(), null,
                     op.getOutputRequiredPrivileges());
+            continue;
           }
         }
 
+        if (write.getTable() != null) {
+          ss.getAuthorizer().authorize(write.getTable(), null,
+                  op.getOutputRequiredPrivileges());
+        }
       }
     }
 
     if (inputs != null && inputs.size() > 0) {
-
       Map<Table, List<String>> tab2Cols = new HashMap<Table, List<String>>();
       Map<Partition, List<String>> part2Cols = new HashMap<Partition, List<String>>();
 
+      //determine if partition level privileges should be checked for input tables
       Map<String, Boolean> tableUsePartLevelAuth = new HashMap<String, Boolean>();
       for (ReadEntity read : inputs) {
         if (read.getType() == Entity.Type.DATABASE) {
@@ -596,6 +605,8 @@ public class Driver implements CommandPr
         }
       }
 
+      //for a select or create-as-select query, populate the partition to column (par2Cols) or
+      // table to columns mapping (tab2Cols)
       if (op.equals(HiveOperation.CREATETABLE_AS_SELECT)
           || op.equals(HiveOperation.QUERY)) {
         SemanticAnalyzer querySem = (SemanticAnalyzer) sem;
@@ -691,6 +702,49 @@ public class Driver implements CommandPr
     }
   }
 
+  private void doAuthorizationV2(SessionState ss, HiveOperation op, HashSet<ReadEntity> inputs,
+      HashSet<WriteEntity> outputs) {
+    HiveOperationType hiveOpType = getHiveOperationType(op);
+    List<HivePrivilegeObject> inputsHObjs = getHivePrivObjects(inputs);
+    List<HivePrivilegeObject> outputHObjs = getHivePrivObjects(outputs);
+    ss.getAuthorizerV2().checkPrivileges(hiveOpType, inputsHObjs, outputHObjs);
+    return;
+  }
+
+  private List<HivePrivilegeObject> getHivePrivObjects(HashSet<? extends Entity> inputs) {
+    List<HivePrivilegeObject> hivePrivobjs = new ArrayList<HivePrivilegeObject>();
+    for(Entity input : inputs){
+      HivePrivilegeObjectType privObjType = getHivePrivilegeObjectType(input.getType());
+      //support for authorization on partitions or uri needs to be added
+      HivePrivilegeObject hPrivObject = new HivePrivilegeObject(privObjType,
+          input.getDatabase().getName(),
+          input.getTable().getTableName());
+      hivePrivobjs.add(hPrivObject);
+    }
+    return hivePrivobjs;
+  }
+
+  private HivePrivilegeObjectType getHivePrivilegeObjectType(Type type) {
+    switch(type){
+    case DATABASE:
+      return HivePrivilegeObjectType.DATABASE;
+    case TABLE:
+      return HivePrivilegeObjectType.TABLE;
+    case LOCAL_DIR:
+    case DFS_DIR:
+      return HivePrivilegeObjectType.URI;
+    case PARTITION:
+    case DUMMYPARTITION: //need to determine if a different type is needed for dummy partitions
+      return HivePrivilegeObjectType.PARTITION;
+    default:
+      return null;
+    }
+  }
+
+  private HiveOperationType getHiveOperationType(HiveOperation op) {
+    return HiveOperationType.valueOf(op.name());
+  }
+
   /**
    * @return The current query plan associated with this Driver, if any.
    */
@@ -1234,6 +1288,8 @@ public class Driver implements CommandPr
       Map<TaskResult, TaskRunner> running = new HashMap<TaskResult, TaskRunner>();
 
       DriverContext driverCxt = new DriverContext(runnable, ctx);
+      driverCxt.prepare(plan);
+
       ctx.setHDFSCleanup(true);
 
       SessionState.get().setLastMapRedStatsList(new ArrayList<MapRedStats>());
@@ -1313,6 +1369,8 @@ public class Driver implements CommandPr
           }
         }
 
+        driverCxt.finished(tskRun);
+
         if (SessionState.get() != null) {
           SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
               Keys.TASK_RET_CODE, String.valueOf(exitVal));
@@ -1474,6 +1532,8 @@ public class Driver implements CommandPr
     TaskResult tskRes = new TaskResult();
     TaskRunner tskRun = new TaskRunner(tsk, tskRes);
 
+    cxt.prepare(tskRun);
+
     // Launch Task
     if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
       // Launch it in the parallel mode, as a separate thread only for MR tasks

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Tue Jan 28 05:48:03 2014
@@ -18,13 +18,25 @@
 
 package org.apache.hadoop.hive.ql;
 
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.NodeUtils;
+import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.mapred.JobConf;
-
 /**
  * DriverContext.
  *
@@ -38,6 +50,8 @@ public class DriverContext {
 
   Context ctx;
 
+  final Map<String, StatsTask> statsTasks = new HashMap<String, StatsTask>(1);
+
   public DriverContext() {
     this.runnable = null;
     this.ctx = null;
@@ -82,5 +96,42 @@ public class DriverContext {
   public void incCurJobNo(int amount) {
     this.curJobNo = this.curJobNo + amount;
   }
-  
+
+  public void prepare(QueryPlan plan) {
+    // extract stats keys from StatsTask
+    List<Task<?>> rootTasks = plan.getRootTasks();
+    NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function<StatsTask>() {
+      public void apply(StatsTask statsTask) {
+        statsTasks.put(statsTask.getWork().getAggKey(), statsTask);
+      }
+    });
+  }
+
+  public void prepare(TaskRunner runner) {
+  }
+
+  public void finished(TaskRunner runner) {
+    if (statsTasks.isEmpty() || !(runner.getTask() instanceof MapRedTask)) {
+      return;
+    }
+    MapRedTask mapredTask = (MapRedTask) runner.getTask();
+
+    MapWork mapWork = mapredTask.getWork().getMapWork();
+    ReduceWork reduceWork = mapredTask.getWork().getReduceWork();
+    List<Operator> operators = new ArrayList<Operator>(mapWork.getAliasToWork().values());
+    if (reduceWork != null) {
+      operators.add(reduceWork.getReducer());
+    }
+    final List<String> statKeys = new ArrayList<String>(1);
+    NodeUtils.iterate(operators, FileSinkOperator.class, new Function<FileSinkOperator>() {
+      public void apply(FileSinkOperator fsOp) {
+        if (fsOp.getConf().isGatherStats()) {
+          statKeys.add(fsOp.getConf().getStatsAggPrefix());
+        }
+      }
+    });
+    for (String statKey : statKeys) {
+      statsTasks.get(statKey).getWork().setSourceTask(mapredTask);
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Jan 28 05:48:03 2014
@@ -45,7 +45,7 @@ public enum ErrorMsg {
   // 30000 to 39999: Runtime errors which Hive thinks may be transient and retrying may succeed.
   // 40000 to 49999: Errors where Hive is unable to advise about retries.
   // In addition to the error code, ErrorMsg also has a SQLState field.
-  // SQLStates are taken from Section 12.5 of ISO-9075.
+  // SQLStates are taken from Section 22.1 of ISO-9075.
   // See http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt
   // Most will just rollup to the generic syntax error state of 42000, but
   // specific errors can override the that state.
@@ -53,6 +53,7 @@ public enum ErrorMsg {
   // http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-error-sqlstates.html
   GENERIC_ERROR(40000, "Exception while processing"),
 
+  //========================== 10000 range starts here ========================//
   INVALID_TABLE(10001, "Table not found", "42S02"),
   INVALID_COLUMN(10002, "Invalid column reference"),
   INVALID_INDEX(10003, "Invalid index"),
@@ -370,7 +371,11 @@ public enum ErrorMsg {
   INVALID_DIR(10252, "{0} is not a directory", true),
   NO_VALID_LOCATIONS(10253, "Could not find any valid location to place the jars. " +
   "Please update hive.jar.directory or hive.user.install.directory with a valid location", false),
+  UNNSUPPORTED_AUTHORIZATION_PRINCIPAL_TYPE_GROUP(10254,
+      "Principal type GROUP is not supported in this authorization setting", "28000"),
+  INVALID_TABLE_NAME(10255, "Invalid table name {0}", true),
 
+  //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
       + "It may have crashed with an error."),
@@ -382,6 +387,7 @@ public enum ErrorMsg {
       "tried to create too many dynamic partitions. The maximum number of dynamic partitions " +
       "is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
 
+  //========================== 30000 range starts here ========================//
   STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
     "There was a error to retrieve the StatsPublisher, and retrying " +
     "might help. If you dont want the query to fail because accurate statistics " +

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java Tue Jan 28 05:48:03 2014
@@ -108,11 +108,11 @@ public final class ArchiveUtils {
       } catch (MetaException e) {
         throw new HiveException("Unable to get partitions directories prefix", e);
       }
-      URI tableDir = tbl.getDataLocation();
+      Path tableDir = tbl.getDataLocation();
       if(tableDir == null) {
         throw new HiveException("Table has no location set");
       }
-      return new Path(tableDir.toString(), prefixSubdir);
+      return new Path(tableDir, prefixSubdir);
     }
     /**
      * Generates name for prefix partial partition specification.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Jan 28 05:48:03 2014
@@ -22,7 +22,6 @@ import static org.apache.commons.lang.St
 import static org.apache.hadoop.util.StringUtils.stringifyException;
 
 import java.io.BufferedWriter;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -157,6 +156,12 @@ import org.apache.hadoop.hive.ql.plan.Un
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.security.authorization.Privilege;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal.HivePrincipalType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilege;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -302,7 +307,7 @@ public class DDLTask extends Task<DDLWor
 
       AddPartitionDesc addPartitionDesc = work.getAddPartitionDesc();
       if (addPartitionDesc != null) {
-        return addPartition(db, addPartitionDesc);
+        return addPartitions(db, addPartitionDesc);
       }
 
       RenamePartitionDesc renamePartitionDesc = work.getRenamePartitionDesc();
@@ -399,7 +404,8 @@ public class DDLTask extends Task<DDLWor
       GrantDesc grantDesc = work.getGrantDesc();
       if (grantDesc != null) {
         return grantOrRevokePrivileges(grantDesc.getPrincipals(), grantDesc
-            .getPrivileges(), grantDesc.getPrivilegeSubjectDesc(), grantDesc.getGrantor(), grantDesc.getGrantorType(), grantDesc.isGrantOption(), true);
+            .getPrivileges(), grantDesc.getPrivilegeSubjectDesc(), grantDesc.getGrantor(),
+            grantDesc.getGrantorType(), grantDesc.isGrantOption(), true);
       }
 
       RevokeDesc revokeDesc = work.getRevokeDesc();
@@ -489,6 +495,11 @@ public class DDLTask extends Task<DDLWor
       boolean grantRole = grantOrRevokeRoleDDL.getGrant();
       List<PrincipalDesc> principals = grantOrRevokeRoleDDL.getPrincipalDesc();
       List<String> roles = grantOrRevokeRoleDDL.getRoles();
+
+      if(SessionState.get().isAuthorizationModeV2()){
+        return grantOrRevokeRoleV2(grantOrRevokeRoleDDL);
+      }
+
       for (PrincipalDesc principal : principals) {
         String userName = principal.getName();
         for (String roleName : roles) {
@@ -507,6 +518,28 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private int grantOrRevokeRoleV2(GrantRevokeRoleDDL grantOrRevokeRoleDDL) throws HiveException {
+    HiveAuthorizer authorizer = SessionState.get().getAuthorizerV2();
+    //convert to the types needed for plugin api
+    HivePrincipal grantorPrinc = null;
+    if(grantOrRevokeRoleDDL.getGrantor() != null){
+      grantorPrinc = new HivePrincipal(grantOrRevokeRoleDDL.getGrantor(),
+          getHivePrincipalType(grantOrRevokeRoleDDL.getGrantorType()));
+    }
+    List<HivePrincipal> hivePrincipals = getHivePrincipals(grantOrRevokeRoleDDL.getPrincipalDesc());
+    List<String> roles = grantOrRevokeRoleDDL.getRoles();
+
+    if(grantOrRevokeRoleDDL.getGrant()){
+      authorizer.grantRole(hivePrincipals, roles,
+          grantOrRevokeRoleDDL.isGrantOption(), grantorPrinc);
+    }
+    else{
+      authorizer.revokeRole(hivePrincipals, roles,
+          grantOrRevokeRoleDDL.isGrantOption(), grantorPrinc);
+    }
+    return 0;
+  }
+
   private int showGrants(ShowGrantDesc showGrantDesc) throws HiveException {
     StringBuilder builder = new StringBuilder();
     try {
@@ -514,6 +547,7 @@ public class DDLTask extends Task<DDLWor
       PrivilegeObjectDesc hiveObjectDesc = showGrantDesc.getHiveObj();
       String principalName = principalDesc.getName();
       if (hiveObjectDesc == null) {
+        //show all privileges for this user
         List<HiveObjectPrivilege> users = db.showPrivilegeGrant(
             HiveObjectType.GLOBAL, principalName, principalDesc.getType(),
             null, null, null, null);
@@ -619,7 +653,14 @@ public class DDLTask extends Task<DDLWor
 
   private int grantOrRevokePrivileges(List<PrincipalDesc> principals,
       List<PrivilegeDesc> privileges, PrivilegeObjectDesc privSubjectDesc,
-      String grantor, PrincipalType grantorType, boolean grantOption, boolean isGrant) {
+      String grantor, PrincipalType grantorType, boolean grantOption, boolean isGrant)
+          throws HiveException {
+
+    if(SessionState.get().isAuthorizationModeV2()){
+      return grantOrRevokePrivilegesV2(principals, privileges, privSubjectDesc, grantor,
+          grantorType, grantOption, isGrant);
+    }
+
     if (privileges == null || privileges.size() == 0) {
       console.printError("No privilege found.");
       return 1;
@@ -638,15 +679,12 @@ public class DDLTask extends Task<DDLWor
         }
         String obj = privSubjectDesc.getObject();
 
+        //get the db, table objects
         if (privSubjectDesc.getTable()) {
-          String[] dbTab = obj.split("\\.");
-          if (dbTab.length == 2) {
-            dbName = dbTab[0];
-            tableName = dbTab[1];
-          } else {
-            dbName = SessionState.get().getCurrentDatabase();
-            tableName = obj;
-          }
+          String[] dbTable = Utilities.getDbTableName(obj);
+          dbName = dbTable[0];
+          tableName = dbTable[1];
+
           dbObj = db.getDatabase(dbName);
           if (dbObj == null) {
             throwNotFound("Database", dbName);
@@ -757,13 +795,83 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private int grantOrRevokePrivilegesV2(List<PrincipalDesc> principals,
+      List<PrivilegeDesc> privileges, PrivilegeObjectDesc privSubjectDesc, String grantor,
+      PrincipalType grantorType, boolean grantOption, boolean isGrant) throws HiveException {
+    HiveAuthorizer authorizer = SessionState.get().getAuthorizerV2();
+
+    //Convert to object types used by the authorization plugin interface
+    List<HivePrincipal> hivePrincipals = getHivePrincipals(principals);
+    List<HivePrivilege> hivePrivileges = getHivePrivileges(privileges);
+    HivePrivilegeObject hivePrivObject = getHivePrivilegeObject(privSubjectDesc);
+    HivePrincipal grantorPrincipal = new HivePrincipal(grantor, getHivePrincipalType(grantorType));
+
+    if(isGrant){
+      authorizer.grantPrivileges(hivePrincipals, hivePrivileges, hivePrivObject,
+          grantorPrincipal, grantOption);
+    }else {
+      authorizer.revokePrivileges(hivePrincipals, hivePrivileges,
+          hivePrivObject, grantorPrincipal, grantOption);
+    }
+    //no exception thrown, so looks good
+    return 0;
+  }
+
+  private HivePrivilegeObject getHivePrivilegeObject(PrivilegeObjectDesc privSubjectDesc)
+      throws HiveException {
+    String [] dbTable = Utilities.getDbTableName(privSubjectDesc.getObject());
+    return new HivePrivilegeObject(getPrivObjectType(privSubjectDesc), dbTable[0], dbTable[1]);
+  }
+
+  private HivePrivilegeObjectType getPrivObjectType(PrivilegeObjectDesc privSubjectDesc) {
+    //TODO: This needs to change to support view once view grant/revoke is supported as
+    // part of HIVE-6181
+    return privSubjectDesc.getTable() ? HivePrivilegeObjectType.TABLE : HivePrivilegeObjectType.DATABASE;
+  }
+
+  private List<HivePrivilege> getHivePrivileges(List<PrivilegeDesc> privileges) {
+    List<HivePrivilege> hivePrivileges = new ArrayList<HivePrivilege>();
+    for(PrivilegeDesc privilege : privileges){
+      hivePrivileges.add(
+          new HivePrivilege(privilege.getPrivilege().toString(), privilege.getColumns()));
+    }
+    return hivePrivileges;
+  }
+
+  private List<HivePrincipal> getHivePrincipals(List<PrincipalDesc> principals) throws HiveException {
+    ArrayList<HivePrincipal> hivePrincipals = new ArrayList<HivePrincipal>();
+    for(PrincipalDesc principal : principals){
+      hivePrincipals.add(
+          new HivePrincipal(principal.getName(), getHivePrincipalType(principal.getType())));
+    }
+    return hivePrincipals;
+  }
+
+  private HivePrincipalType getHivePrincipalType(PrincipalType type) throws HiveException {
+    switch(type){
+    case USER:
+      return HivePrincipalType.USER;
+    case ROLE:
+      return HivePrincipalType.ROLE;
+    case GROUP:
+      throw new HiveException(ErrorMsg.UNNSUPPORTED_AUTHORIZATION_PRINCIPAL_TYPE_GROUP);
+    default:
+      //should not happen as we take care of all existing types
+      throw new HiveException("Unsupported authorization type specified");
+    }
+  }
+
   private void throwNotFound(String objType, String objName) throws HiveException {
     throw new HiveException(objType + " " + objName + " not found");
   }
 
-  private int roleDDL(RoleDDLDesc roleDDLDesc) {
+  private int roleDDL(RoleDDLDesc roleDDLDesc) throws HiveException, IOException {
+    if(SessionState.get().isAuthorizationModeV2()){
+      return roleDDLV2(roleDDLDesc);
+    }
+
+    DataOutputStream outStream = null;
     RoleDDLDesc.RoleOperation operation = roleDDLDesc.getOperation();
-    DataOutput outStream = null;
     try {
       if (operation.equals(RoleDDLDesc.RoleOperation.CREATE_ROLE)) {
         db.createRole(roleDDLDesc.getName(), roleDDLDesc.getRoleOwnerName());
@@ -780,9 +888,20 @@ public class DDLTask extends Task<DDLWor
             outStream.writeBytes(role.getRoleName());
             outStream.write(terminator);
           }
-          ((FSDataOutputStream) outStream).close();
+          outStream.close();
           outStream = null;
         }
+      } else if (operation.equals(RoleDDLDesc.RoleOperation.SHOW_ROLES)) {
+        List<String> roleNames = db.getAllRoleNames();
+        Path resFile = new Path(roleDDLDesc.getResFile());
+        FileSystem fs = resFile.getFileSystem(conf);
+        outStream = fs.create(resFile);
+        for (String roleName : roleNames) {
+          outStream.writeBytes(roleName);
+          outStream.write(terminator);
+        }
+        outStream.close();
+        outStream = null;
       } else {
         throw new HiveException("Unkown role operation "
             + operation.getOperationName());
@@ -802,6 +921,48 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private int roleDDLV2(RoleDDLDesc roleDDLDesc) throws HiveException, IOException {
+    HiveAuthorizer authorizer = SessionState.get().getAuthorizerV2();
+    RoleDDLDesc.RoleOperation operation = roleDDLDesc.getOperation();
+    //call the appropriate hive authorizer function
+    switch(operation){
+    case CREATE_ROLE:
+      authorizer.createRole(roleDDLDesc.getName(), null);
+      break;
+    case DROP_ROLE:
+      authorizer.dropRole(roleDDLDesc.getName());
+      break;
+    case SHOW_ROLE_GRANT:
+      List<String> roles = authorizer.getRoles(new HivePrincipal(roleDDLDesc.getName(),
+          getHivePrincipalType(roleDDLDesc.getPrincipalType())));
+      writeListToFile(roles, roleDDLDesc.getResFile());
+      break;
+    case SHOW_ROLES:
+      List<String> allRoles = authorizer.getAllRoles();
+      writeListToFile(allRoles, roleDDLDesc.getResFile());
+      break;
+    default:
+      throw new HiveException("Unkown role operation "
+          + operation.getOperationName());
+    }
+    return 0;
+  }
+
+  /**
+   * Write list of string entries into given file
+   * @param entries
+   * @param resFile
+   * @throws IOException
+   */
+  private void writeListToFile(List<String> entries, String resFile) throws IOException {
+    StringBuilder sb = new StringBuilder(entries.size()*2);
+    for(String entry : entries){
+      sb.append(entry);
+      sb.append(terminator);
+    }
+    writeToFile(sb.toString(), resFile);
+  }
+
   private int alterDatabase(AlterDatabaseDesc alterDbDesc) throws HiveException {
 
     String dbName = alterDbDesc.getDatabaseName();
@@ -883,8 +1044,8 @@ public class DDLTask extends Task<DDLWor
             }
             if (baseParts != null) {
               for (Partition p : baseParts) {
-                FileSystem fs = p.getPartitionPath().getFileSystem(db.getConf());
-                FileStatus fss = fs.getFileStatus(p.getPartitionPath());
+                FileSystem fs = p.getDataLocation().getFileSystem(db.getConf());
+                FileStatus fss = fs.getFileStatus(p.getDataLocation());
                 basePartTs.put(p.getSpec(), fss.getModificationTime());
               }
             }
@@ -932,61 +1093,20 @@ public class DDLTask extends Task<DDLWor
   }
 
   /**
-   * Add a partition to a table.
+   * Add a partitions to a table.
    *
    * @param db
    *          Database to add the partition to.
    * @param addPartitionDesc
-   *          Add this partition.
+   *          Add these partitions.
    * @return Returns 0 when execution succeeds and above 0 if it fails.
    * @throws HiveException
    */
-  private int addPartition(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException {
-
-    Table tbl = db.getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName());
-
-    // If the add partition was created with IF NOT EXISTS, then we should
-    // not throw an error if the specified part does exist.
-    Partition checkPart = db.getPartition(tbl, addPartitionDesc.getPartSpec(), false);
-    if (checkPart != null && addPartitionDesc.getIfNotExists()) {
-      return 0;
+  private int addPartitions(Hive db, AddPartitionDesc addPartitionDesc) throws HiveException {
+    List<Partition> parts = db.createPartitions(addPartitionDesc);
+    for (Partition part : parts) {
+      work.getOutputs().add(new WriteEntity(part));
     }
-
-
-
-    if (addPartitionDesc.getLocation() == null) {
-      db.createPartition(tbl, addPartitionDesc.getPartSpec(), null,
-          addPartitionDesc.getPartParams(),
-                    addPartitionDesc.getInputFormat(),
-                    addPartitionDesc.getOutputFormat(),
-                    addPartitionDesc.getNumBuckets(),
-                    addPartitionDesc.getCols(),
-                    addPartitionDesc.getSerializationLib(),
-                    addPartitionDesc.getSerdeParams(),
-                    addPartitionDesc.getBucketCols(),
-                    addPartitionDesc.getSortCols());
-
-    } else {
-      if (tbl.isView()) {
-        throw new HiveException("LOCATION clause illegal for view partition");
-      }
-      // set partition path relative to table
-      db.createPartition(tbl, addPartitionDesc.getPartSpec(), new Path(tbl
-                    .getPath(), addPartitionDesc.getLocation()), addPartitionDesc.getPartParams(),
-                    addPartitionDesc.getInputFormat(),
-                    addPartitionDesc.getOutputFormat(),
-                    addPartitionDesc.getNumBuckets(),
-                    addPartitionDesc.getCols(),
-                    addPartitionDesc.getSerializationLib(),
-                    addPartitionDesc.getSerdeParams(),
-                    addPartitionDesc.getBucketCols(),
-                    addPartitionDesc.getSortCols());
-    }
-
-    Partition part = db
-        .getPartition(tbl, addPartitionDesc.getPartSpec(), false);
-    work.getOutputs().add(new WriteEntity(part));
-
     return 0;
   }
 
@@ -1212,12 +1332,12 @@ public class DDLTask extends Task<DDLWor
     } catch (MetaException e) {
       throw new HiveException("Unable to get partition's directory", e);
     }
-    URI tableDir = tbl.getDataLocation();
+    Path tableDir = tbl.getDataLocation();
     if(tableDir == null) {
       throw new HiveException("Table has no location set");
     }
 
-    String standardLocation = (new Path(tableDir.toString(), subdir)).toString();
+    String standardLocation = (new Path(tableDir, subdir)).toString();
     if(ArchiveUtils.isArchived(p)) {
       return !getOriginalLocation(p).equals(standardLocation);
     } else {
@@ -1266,7 +1386,7 @@ public class DDLTask extends Task<DDLWor
       if(ArchiveUtils.isArchived(p)) {
         originalDir = new Path(getOriginalLocation(p));
       } else {
-        originalDir = p.getPartitionPath();
+        originalDir = p.getDataLocation();
       }
     }
 
@@ -1343,7 +1463,7 @@ public class DDLTask extends Task<DDLWor
       // First create the archive in a tmp dir so that if the job fails, the
       // bad files don't pollute the filesystem
       Path tmpPath = new Path(driverContext.getCtx()
-                    .getExternalTmpFileURI(originalDir.toUri()), "partlevel");
+                    .getExternalTmpPath(originalDir.toUri()), "partlevel");
 
       console.printInfo("Creating " + archiveName +
                         " for " + originalDir.toString());
@@ -1418,8 +1538,8 @@ public class DDLTask extends Task<DDLWor
     // Record this change in the metastore
     try {
       for(Partition p: partitions) {
-        URI originalPartitionUri = ArchiveUtils.addSlash(p.getPartitionPath().toUri());
-        URI test = p.getPartitionPath().toUri();
+        URI originalPartitionUri = ArchiveUtils.addSlash(p.getDataLocation().toUri());
+        URI test = p.getDataLocation().toUri();
         URI harPartitionDir = harHelper.getHarUri(originalPartitionUri, shim);
         StringBuilder authority = new StringBuilder();
         if(harPartitionDir.getUserInfo() != null) {
@@ -1538,9 +1658,7 @@ public class DDLTask extends Task<DDLWor
       throw new HiveException("Haven't found any archive where it should be");
     }
 
-    Path tmpPath = new Path(driverContext
-          .getCtx()
-          .getExternalTmpFileURI(originalDir.toUri()));
+    Path tmpPath = driverContext.getCtx().getExternalTmpPath(originalDir.toUri());
 
     try {
       fs = tmpPath.getFileSystem(conf);
@@ -1548,11 +1666,6 @@ public class DDLTask extends Task<DDLWor
       throw new HiveException(e);
     }
 
-    // Some sanity checks
-    if (originalDir == null) {
-      throw new HiveException("Missing archive data in the partition");
-    }
-
     // Clarification of terms:
     // - The originalDir directory represents the original directory of the
     //   partitions' files. They now contain an archived version of those files
@@ -1870,7 +1983,7 @@ public class DDLTask extends Task<DDLWor
 
     String tableName = showCreateTbl.getTableName();
     Table tbl = db.getTable(tableName, false);
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     List<String> duplicateProps = new ArrayList<String>();
     try {
       Path resFile = new Path(showCreateTbl.getResFile());
@@ -1880,7 +1993,7 @@ public class DDLTask extends Task<DDLWor
       if (tbl.isView()) {
         String createTab_stmt = "CREATE VIEW `" + tableName + "` AS " + tbl.getViewExpandedText();
         outStream.writeBytes(createTab_stmt.toString());
-        ((FSDataOutputStream) outStream).close();
+        outStream.close();
         outStream = null;
         return 0;
       }
@@ -2067,7 +2180,7 @@ public class DDLTask extends Task<DDLWor
       createTab_stmt.add(TBL_PROPERTIES, tbl_properties);
 
       outStream.writeBytes(createTab_stmt.render());
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
     } catch (FileNotFoundException e) {
       LOG.info("show create table: " + stringifyException(e));
@@ -2106,7 +2219,7 @@ public class DDLTask extends Task<DDLWor
     indexes = db.getIndexes(tbl.getDbName(), tbl.getTableName(), (short) -1);
 
     // write the results in the file
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     try {
       Path resFile = new Path(showIndexes.getResFile());
       FileSystem fs = resFile.getFileSystem(conf);
@@ -2124,7 +2237,7 @@ public class DDLTask extends Task<DDLWor
         outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(index));
       }
 
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
 
     } catch (FileNotFoundException e) {
@@ -2240,7 +2353,7 @@ public class DDLTask extends Task<DDLWor
     }
 
     // write the results in the file
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     try {
       Path resFile = new Path(showCols.getResFile());
       FileSystem fs = resFile.getFileSystem(conf);
@@ -2250,7 +2363,7 @@ public class DDLTask extends Task<DDLWor
       cols.addAll(table.getPartCols());
       outStream.writeBytes(
           MetaDataFormatUtils.getAllColumnsInformation(cols, false));
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
     } catch (IOException e) {
       throw new HiveException(e, ErrorMsg.GENERIC_ERROR);
@@ -2281,7 +2394,7 @@ public class DDLTask extends Task<DDLWor
     }
 
     // write the results in the file
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     try {
       Path resFile = new Path(showFuncs.getResFile());
       FileSystem fs = resFile.getFileSystem(conf);
@@ -2296,7 +2409,7 @@ public class DDLTask extends Task<DDLWor
         outStream.writeBytes(iterFuncs.next());
         outStream.write(terminator);
       }
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
     } catch (FileNotFoundException e) {
       LOG.warn("show function: " + stringifyException(e));
@@ -2330,7 +2443,7 @@ public class DDLTask extends Task<DDLWor
     }
 
     // write the results in the file
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     try {
       Path resFile = new Path(showLocks.getResFile());
       FileSystem fs = resFile.getFileSystem(conf);
@@ -2388,7 +2501,7 @@ public class DDLTask extends Task<DDLWor
         }
         outStream.write(terminator);
       }
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
     } catch (FileNotFoundException e) {
       LOG.warn("show function: " + stringifyException(e));
@@ -2590,7 +2703,7 @@ public class DDLTask extends Task<DDLWor
     String funcName = descFunc.getName();
 
     // write the results in the file
-    DataOutput outStream = null;
+    DataOutputStream outStream = null;
     try {
       Path resFile = new Path(descFunc.getResFile());
       FileSystem fs = resFile.getFileSystem(conf);
@@ -2629,7 +2742,7 @@ public class DDLTask extends Task<DDLWor
 
       outStream.write(terminator);
 
-      ((FSDataOutputStream) outStream).close();
+      outStream.close();
       outStream = null;
     } catch (FileNotFoundException e) {
       LOG.warn("describe function: " + stringifyException(e));
@@ -3211,7 +3324,7 @@ public class DDLTask extends Task<DDLWor
         if (part != null) {
           part.setLocation(newLocation);
         } else {
-          tbl.setDataLocation(locUri);
+          tbl.setDataLocation(new Path(locUri));
         }
       } catch (URISyntaxException e) {
         throw new HiveException(e);
@@ -3668,7 +3781,7 @@ public class DDLTask extends Task<DDLWor
       tbl.setProperty("comment", crtTbl.getComment());
     }
     if (crtTbl.getLocation() != null) {
-      tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri());
+      tbl.setDataLocation(new Path(crtTbl.getLocation()));
     }
 
     if (crtTbl.getSkewedColNames() != null) {
@@ -3806,7 +3919,7 @@ public class DDLTask extends Task<DDLWor
       tbl.setTableName(newTable.getTableName());
 
       if (crtTbl.getLocation() != null) {
-        tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri());
+        tbl.setDataLocation(new Path(crtTbl.getLocation()));
       } else {
         tbl.unsetDataLocation();
       }
@@ -3960,14 +4073,14 @@ public class DDLTask extends Task<DDLWor
     if (partSpec == null) {
       if (table.isPartitioned()) {
         for (Partition partition : db.getPartitions(table)) {
-          locations.add(partition.getPartitionPath());
+          locations.add(partition.getDataLocation());
         }
       } else {
         locations.add(table.getPath());
       }
     } else {
       for (Partition partition : db.getPartitionsByNames(table, partSpec)) {
-        locations.add(partition.getPartitionPath());
+        locations.add(partition.getDataLocation());
       }
     }
     return locations;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Jan 28 05:48:03 2014
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Stack;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,7 +62,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -253,13 +251,13 @@ public class FileSinkOperator extends Te
     // The movetask that follows subQ1 and subQ2 tasks still moves the directory
     // 'Parent'
     if ((!conf.isLinkedFileSink()) || (dpCtx == null)) {
-      specPath = new Path(conf.getDirName());
+      specPath = conf.getDirName();
       childSpecPathDynLinkedPartitions = null;
       return;
     }
 
-    specPath = new Path(conf.getParentDir());
-    childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName());
+    specPath = conf.getParentDir();
+    childSpecPathDynLinkedPartitions = conf.getDirName().getName();
   }
 
   @Override
@@ -816,7 +814,7 @@ public class FileSinkOperator extends Te
       throws HiveException {
     try {
       if ((conf != null) && isNativeTable) {
-        String specPath = conf.getDirName();
+        Path specPath = conf.getDirName();
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
         if (conf.isLinkedFileSink() && (dpCtx != null)) {
           specPath = conf.getParentDir();
@@ -926,7 +924,7 @@ public class FileSinkOperator extends Te
         postfix = Utilities.join(lbSpec, taskID);
       }
       prefix = Utilities.join(prefix, spSpec, dpSpec);
-      prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, postfix.length());
+      prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength);
 
       String key = Utilities.join(prefix, postfix);
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Tue Jan 28 05:48:03 2014
@@ -420,7 +420,6 @@ public final class FunctionRegistry {
     registerGenericUDF(true, LEAD_FUNC_NAME, GenericUDFLead.class);
     registerGenericUDF(true, LAG_FUNC_NAME, GenericUDFLag.class);
 
-    registerHiveUDAFsAsWindowFunctions();
     registerWindowFunction("row_number", new GenericUDAFRowNumber());
     registerWindowFunction("rank", new GenericUDAFRank());
     registerWindowFunction("dense_rank", new GenericUDAFDenseRank());
@@ -1003,6 +1002,10 @@ public final class FunctionRegistry {
       GenericUDAFResolver genericUDAFResolver) {
     FunctionInfo fi = new FunctionInfo(isNative, functionName.toLowerCase(), genericUDAFResolver);
     mFunctions.put(functionName.toLowerCase(), fi);
+
+    // All aggregate functions should also be usable as window functions
+    addFunctionInfoToWindowFunctions(functionName, fi);
+
     registerNativeStatus(fi);
   }
 
@@ -1021,6 +1024,10 @@ public final class FunctionRegistry {
         functionName.toLowerCase(), new GenericUDAFBridge(
         (UDAF) ReflectionUtils.newInstance(udafClass, null)));
     mFunctions.put(functionName.toLowerCase(), fi);
+
+    // All aggregate functions should also be usable as window functions
+    addFunctionInfoToWindowFunctions(functionName, fi);
+
     registerNativeStatus(fi);
   }
 
@@ -1676,16 +1683,14 @@ public final class FunctionRegistry {
   {
     FunctionInfo fInfo = null;
     if (registerAsUDAF) {
+      // Just register the function normally, will also get added to window functions.
       registerGenericUDAF(true, name, wFn);
-      fInfo = getFunctionInfo(name);
     }
     else {
-      fInfo = new FunctionInfo(true,
-          name.toLowerCase(), wFn);
+      name = name.toLowerCase();
+      fInfo = new FunctionInfo(true, name, wFn);
+      addFunctionInfoToWindowFunctions(name, fInfo);
     }
-
-    WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
-    windowFunctions.put(name.toLowerCase(), wInfo);
   }
 
   public static WindowFunctionInfo getWindowFunctionInfo(String name)
@@ -1719,18 +1724,11 @@ public final class FunctionRegistry {
     return false;
   }
 
-  static void registerHiveUDAFsAsWindowFunctions()
-  {
-    Set<String> fNames = getFunctionNames();
-    for(String fName : fNames)
-    {
-      FunctionInfo fInfo = getFunctionInfo(fName);
-      if ( fInfo.isGenericUDAF())
-      {
-        WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo);
-        windowFunctions.put(fName, wInfo);
-      }
-    }
+  static private void addFunctionInfoToWindowFunctions(String functionName,
+      FunctionInfo functionInfo) {
+    // Assumes that the caller has already verified that functionInfo is for an aggregate function
+    WindowFunctionInfo wInfo = new WindowFunctionInfo(functionInfo);
+    windowFunctions.put(functionName.toLowerCase(), wInfo);
   }
 
   public static boolean isTableFunction(String name)

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Tue Jan 28 05:48:03 2014
@@ -265,9 +265,9 @@ public class HashTableSinkOperator exten
   public void closeOp(boolean abort) throws HiveException {
     try {
       if (mapJoinTables != null) {
-        // get tmp file URI
-        String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
-        LOG.info("Temp URI for side table: " + tmpURI);
+        // get tmp path
+        Path tmpPath = this.getExecContext().getLocalWork().getTmpPath();
+        LOG.info("Temp URI for side table: " + tmpPath);
         for (byte tag = 0; tag < mapJoinTables.length; tag++) {
           // get the key and value
           MapJoinTableContainer tableContainer = mapJoinTables[tag];
@@ -279,10 +279,9 @@ public class HashTableSinkOperator exten
           String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
           // get the tmp URI path; it will be a hdfs path if not local mode
           String dumpFilePrefix = conf.getDumpFilePrefix();
-          String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
-          console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
+          Path path = Utilities.generatePath(tmpPath, dumpFilePrefix, tag, fileName);
+          console.printInfo(Utilities.now() + "\tDump the side-table into file: " + path);
           // get the hashtable file and path
-          Path path = new Path(tmpURIPath);
           FileSystem fs = path.getFileSystem(hconf);
           ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
           try {
@@ -291,7 +290,7 @@ public class HashTableSinkOperator exten
             out.close();
           }
           tableContainer.clear();
-          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
+          console.printInfo(Utilities.now() + "\tUpload 1 File to: " + path);
         }
       }
       super.closeOp(abort);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Tue Jan 28 05:48:03 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -150,7 +149,7 @@ public class JoinOperator extends Common
     if (conf.getHandleSkewJoin()) {
       try {
         for (int i = 0; i < numAliases; i++) {
-          String specPath = conf.getBigKeysDirMap().get((byte) i);
+          Path specPath = conf.getBigKeysDirMap().get((byte) i);
           mvFileToFinalPath(specPath, hconf, success, LOG);
           for (int j = 0; j < numAliases; j++) {
             if (j == i) {
@@ -165,7 +164,7 @@ public class JoinOperator extends Common
         if (success) {
           // move up files
           for (int i = 0; i < numAliases; i++) {
-            String specPath = conf.getBigKeysDirMap().get((byte) i);
+            Path specPath = conf.getBigKeysDirMap().get((byte) i);
             moveUpFiles(specPath, hconf, LOG);
             for (int j = 0; j < numAliases; j++) {
               if (j == i) {
@@ -184,16 +183,15 @@ public class JoinOperator extends Common
     super.jobCloseOp(hconf, success);
   }
 
-  private void moveUpFiles(String specPath, Configuration hconf, Log log)
+  private void moveUpFiles(Path specPath, Configuration hconf, Log log)
       throws IOException, HiveException {
-    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
-    Path finalPath = new Path(specPath);
+    FileSystem fs = specPath.getFileSystem(hconf);
 
-    if (fs.exists(finalPath)) {
-      FileStatus[] taskOutputDirs = fs.listStatus(finalPath);
+    if (fs.exists(specPath)) {
+      FileStatus[] taskOutputDirs = fs.listStatus(specPath);
       if (taskOutputDirs != null) {
         for (FileStatus dir : taskOutputDirs) {
-          Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath);
+          Utilities.renameOrMoveFiles(fs, dir.getPath(), specPath);
           fs.delete(dir.getPath(), true);
         }
       }
@@ -210,15 +208,13 @@ public class JoinOperator extends Common
    * @throws IOException
    * @throws HiveException
    */
-  private void  mvFileToFinalPath(String specPath, Configuration hconf,
+  private void  mvFileToFinalPath(Path specPath, Configuration hconf,
       boolean success, Log log) throws IOException, HiveException {
 
-    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    FileSystem fs = specPath.getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
         + ".intermediate");
-    Path finalPath = new Path(specPath);
-    ArrayList<String> emptyBuckets = null;
     if (success) {
       if (fs.exists(tmpPath)) {
         // Step1: rename tmp output folder to intermediate path. After this
@@ -229,8 +225,8 @@ public class JoinOperator extends Common
         // Step2: remove any tmp file or double-committed output files
         Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
         // Step3: move to the file destination
-        log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
-        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+        log.info("Moving tmp dir: " + intermediatePath + " to: " + specPath);
+        Utilities.renameOrMoveFiles(fs, intermediatePath, specPath);
       }
     } else {
       fs.delete(tmpPath, true);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Jan 28 05:48:03 2014
@@ -251,7 +251,7 @@ public class MoveTask extends Task<MoveW
           ArrayList<FileStatus> files;
           FileSystem fs;
           try {
-            fs = FileSystem.get(table.getDataLocation(), conf);
+            fs = table.getDataLocation().getFileSystem(conf);
             dirs = fs.globStatus(tbd.getSourcePath());
             files = new ArrayList<FileStatus>();
             for (int i = 0; (dirs != null && i < dirs.length); i++) {
@@ -460,9 +460,9 @@ public class MoveTask extends Task<MoveW
 
     boolean updateBucketCols = false;
     if (bucketCols != null) {
-      FileSystem fileSys = partn.getPartitionPath().getFileSystem(conf);
+      FileSystem fileSys = partn.getDataLocation().getFileSystem(conf);
       FileStatus[] fileStatus = HiveStatsUtils.getFileStatusRecurse(
-          partn.getPartitionPath(), 1, fileSys);
+          partn.getDataLocation(), 1, fileSys);
       // Verify the number of buckets equals the number of files
       // This will not hold for dynamic partitions where not every reducer produced a file for
       // those partitions.  In this case the table is not bucketed as Hive requires a files for

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Jan 28 05:48:03 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
@@ -121,6 +122,7 @@ public final class OperatorFactory {
     vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
     vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
     vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));
+    vectorOpvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, VectorSMBMapJoinOperator.class));
     vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
         VectorReduceSinkOperator.class));
     vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Jan 28 05:48:03 2014
@@ -226,6 +226,11 @@ public class SMBMapJoinOperator extends 
   public void cleanUpInputFileChangedOp() throws HiveException {
     inputFileChanged = true;
   }
+  
+  protected List<Object> smbJoinComputeKeys(Object row, byte alias) throws HiveException {
+    return JoinUtil.computeKeys(row, joinKeys[alias],
+          joinKeysObjectInspectors[alias]);
+  }
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
@@ -260,8 +265,8 @@ public class SMBMapJoinOperator extends 
     byte alias = (byte) tag;
 
     // compute keys and values as StandardObjects
-    ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys[alias],
-        joinKeysObjectInspectors[alias]);
+    List<Object> key = smbJoinComputeKeys(row, alias); 
+        
     List<Object> value = getFilteredValue(alias, row);
 
 
@@ -495,7 +500,7 @@ public class SMBMapJoinOperator extends 
     return smallestOne == null ? null : result;
   }
 
-  private boolean processKey(byte alias, ArrayList<Object> key)
+  private boolean processKey(byte alias, List<Object> key)
       throws HiveException {
     List<Object> keyWritable = keyWritables[alias];
     if (keyWritable == null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Tue Jan 28 05:48:03 2014
@@ -177,7 +177,7 @@ public class SkewJoinHandler {
   void endGroup() throws IOException, HiveException {
     if (skewKeyInCurrentGroup) {
 
-      String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
+      Path specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
       RowContainer<ArrayList<Object>> bigKey = (RowContainer)joinOp.storage[currBigKeyTag];
       Path outputPath = getOperatorOutputPath(specPath);
       FileSystem destFs = outputPath.getFileSystem(hconf);
@@ -258,7 +258,7 @@ public class SkewJoinHandler {
         }
 
         try {
-          String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
+          Path specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
           Path bigKeyPath = getOperatorOutputPath(specPath);
           FileSystem fs = bigKeyPath.getFileSystem(hconf);
           delete(bigKeyPath, fs);
@@ -295,7 +295,7 @@ public class SkewJoinHandler {
         continue;
       }
 
-      String specPath = conf.getBigKeysDirMap().get(
+      Path specPath = conf.getBigKeysDirMap().get(
           Byte.valueOf((byte) bigKeyTbl));
       commitOutputPathToFinalPath(specPath, false);
       for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
@@ -311,7 +311,7 @@ public class SkewJoinHandler {
     }
   }
 
-  private void commitOutputPathToFinalPath(String specPath,
+  private void commitOutputPathToFinalPath(Path specPath,
       boolean ignoreNonExisting) throws IOException {
     Path outPath = getOperatorOutputPath(specPath);
     Path finalPath = getOperatorFinalPath(specPath);
@@ -334,14 +334,12 @@ public class SkewJoinHandler {
     }
   }
 
-  private Path getOperatorOutputPath(String specPath) throws IOException {
-    Path tmpPath = Utilities.toTempPath(specPath);
-    return new Path(tmpPath, Utilities.toTempPath(taskId));
+  private Path getOperatorOutputPath(Path specPath) throws IOException {
+    return new Path(Utilities.toTempPath(specPath), Utilities.toTempPath(taskId));
   }
 
-  private Path getOperatorFinalPath(String specPath) throws IOException {
-    Path tmpPath = Utilities.toTempPath(specPath);
-    return new Path(tmpPath, taskId);
+  private Path getOperatorFinalPath(Path specPath) throws IOException {
+    return new Path(Utilities.toTempPath(specPath), taskId);
   }
 
   public void setSkewJoinJobCounter(LongWritable skewjoinFollowupJobs) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Jan 28 05:48:03 2014
@@ -171,9 +171,7 @@ public class StatsTask extends Task<Stat
 
         if (statsAggregator != null) {
           String prefix = getAggregationPrefix(counterStat, table, null);
-          String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0);
-          updateStats(statsAggregator, parameters, aggKey, atomic);
-          statsAggregator.cleanUp(aggKey);
+          updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic);
         }
 
         updateQuickStats(wh, parameters, tTable.getSd());
@@ -207,9 +205,7 @@ public class StatsTask extends Task<Stat
 
           if (statsAggregator != null) {
             String prefix = getAggregationPrefix(counterStat, table, partn);
-            String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength, 0);
-            updateStats(statsAggregator, parameters, aggKey, atomic);
-            statsAggregator.cleanUp(aggKey);
+            updateStats(statsAggregator, parameters, prefix, maxPrefixLength, atomic);
           }
 
           updateQuickStats(wh, parameters, tPart.getSd());
@@ -296,7 +292,10 @@ public class StatsTask extends Task<Stat
   }
 
   private void updateStats(StatsAggregator statsAggregator,
-      Map<String, String> parameters, String aggKey, boolean atomic) throws HiveException {
+      Map<String, String> parameters, String prefix, int maxPrefixLength, boolean atomic)
+      throws HiveException {
+
+    String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength);
 
     for (String statType : StatsSetupConst.statsRequireCompute) {
       String value = statsAggregator.aggregateStats(aggKey, statType);
@@ -317,6 +316,7 @@ public class StatsTask extends Task<Stat
         }
       }
     }
+    statsAggregator.cleanUp(aggKey);
   }
 
   private void updateQuickStats(Warehouse wh, Map<String, String> parameters,

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Jan 28 05:48:03 2014
@@ -291,14 +291,11 @@ public class TableScanOperator extends O
       statsToPublish.clear();
       String prefix = Utilities.join(conf.getStatsAggPrefix(), pspecs);
 
-      String key;
       int maxKeyLength = conf.getMaxStatsKeyPrefixLength();
-      if (statsPublisher instanceof CounterStatsPublisher) {
-        key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, 0);
-      } else {
+      String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength);
+      if (!(statsPublisher instanceof CounterStatsPublisher)) {
         // stats publisher except counter type needs postfix 'taskID'
-        prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, taskID.length());
-        key = prefix + taskID;
+        key = Utilities.join(prefix, taskID);
       }
       for(String statType : stats.get(pspecs).getStoredStats()) {
         statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType)));

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Jan 28 05:48:03 2014
@@ -70,6 +70,10 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -95,7 +99,6 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -133,12 +136,9 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -184,8 +184,6 @@ import com.esotericsoftware.kryo.io.Inpu
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 
-import org.apache.commons.codec.binary.Base64;
-
 /**
  * Utilities.
  *
@@ -324,6 +322,7 @@ public final class Utilities {
           }
           byte[] planBytes = Base64.decodeBase64(planString);
           in = new ByteArrayInputStream(planBytes);
+          in = new InflaterInputStream(in);
         } else {
           in = new FileInputStream(localPath.toUri().getPath());
         }
@@ -554,22 +553,31 @@ public final class Utilities {
     }
   }
 
-  public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
+  public static class PathDelegate extends PersistenceDelegate {
+    @Override
+    protected Expression instantiate(Object oldInstance, Encoder out) {
+      Path p = (Path)oldInstance;
+      Object[] args = {p.toString()};
+      return new Expression(p, p.getClass(), "new", args);
+    }
+  }
+
+  public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
       setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
 
-  public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) {
+  public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) {
     return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache);
   }
 
-  public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) {
+  public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) {
     return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache);
   }
 
-  private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) {
+  private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) {
     try {
       setPlanPath(conf, hiveScratchDir);
 
@@ -579,11 +587,12 @@ public final class Utilities {
 
       if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
         // add it to the conf
-        out = new ByteArrayOutputStream();
+        ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+        out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED));
         serializePlan(w, out, conf);
         LOG.info("Setting plan: "+planPath.toUri().getPath());
         conf.set(planPath.toUri().getPath(),
-            Base64.encodeBase64String(((ByteArrayOutputStream)out).toByteArray()));
+            Base64.encodeBase64String(byteOut.toByteArray()));
       } else {
         // use the default file system of the conf
         FileSystem fs = planPath.getFileSystem(conf);
@@ -626,7 +635,7 @@ public final class Utilities {
     return new Path(planPath, name);
   }
 
-  private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException {
+  private static void setPlanPath(Configuration conf, Path hiveScratchDir) throws IOException {
     if (getPlanPath(conf) == null) {
       // this is the unique conf ID, which is kept in JobConf as part of the plan file name
       String jobID = UUID.randomUUID().toString();
@@ -814,8 +823,9 @@ public final class Utilities {
   /**
    * Deserializes the plan.
    * @param in The stream to read from.
+   * @param planClass class of plan
+   * @param conf configuration
    * @return The plan, such as QueryPlan, MapredWork, etc.
-   * @param To know what serialization format plan is in
    */
   public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
     return deserializePlan(in, planClass, conf, false);
@@ -860,6 +870,7 @@ public final class Utilities {
     e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
     e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
     e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate());
+    e.setPersistenceDelegate(Path.class, new PathDelegate());
 
     e.writeObject(plan);
     e.close();
@@ -1420,10 +1431,6 @@ public final class Utilities {
     return new Path(orig.getParent(), taskTmpPrefix + orig.getName());
   }
 
-  public static Path toTaskTempPath(String orig) {
-    return toTaskTempPath(new Path(orig));
-  }
-
   public static Path toTempPath(Path orig) {
     if (orig.getName().indexOf(tmpPrefix) == 0) {
       return orig;
@@ -1682,15 +1689,14 @@ public final class Utilities {
     }
   }
 
-  public static void mvFileToFinalPath(String specPath, Configuration hconf,
+  public static void mvFileToFinalPath(Path specPath, Configuration hconf,
       boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
       Reporter reporter) throws IOException,
       HiveException {
 
-    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    FileSystem fs = specPath.getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
-    Path finalPath = new Path(specPath);
     if (success) {
       if (fs.exists(tmpPath)) {
         // remove any tmp file or double-committed output files
@@ -1702,8 +1708,8 @@ public final class Utilities {
         }
 
         // move to the file destination
-        log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath);
-        Utilities.renameOrMoveFiles(fs, tmpPath, finalPath);
+        log.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
+        Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
       }
     } else {
       fs.delete(tmpPath, true);
@@ -2004,6 +2010,25 @@ public final class Utilities {
     return names;
   }
 
+  /**
+   * Extract db and table name from dbtable string, where db and table are separated by "."
+   * If there is no db name part, set the current sessions default db
+   * @param dbtable
+   * @return String array with two elements, first is db name, second is table name
+   * @throws HiveException
+   */
+  public static String[] getDbTableName(String dbtable) throws HiveException{
+    String[] names =  dbtable.split("\\.");
+    switch (names.length) {
+    case 2:
+      return names;
+    case 1:
+      return new String [] {SessionState.get().getCurrentDatabase(), dbtable};
+    default:
+      throw new HiveException(ErrorMsg.INVALID_TABLE_NAME, dbtable);
+    }
+  }
+
   public static void validateColumnNames(List<String> colNames, List<String> checkCols)
       throws SemanticException {
     Iterator<String> checkColsIter = checkCols.iterator();
@@ -2181,6 +2206,8 @@ public final class Utilities {
                   }
                   resultMap.put(pathStr, new ContentSummary(total, -1, -1));
                 }
+                // todo: should nullify summary for non-native tables,
+                // not to be selected as a mapjoin target
                 FileSystem fs = p.getFileSystem(myConf);
                 resultMap.put(pathStr, fs.getContentSummary(p));
               } catch (Exception e) {
@@ -2242,6 +2269,23 @@ public final class Utilities {
     }
   }
 
+  // return sum of lengths except one alias. returns -1 if any of other alias is unknown
+  public static long sumOfExcept(Map<String, Long> aliasToSize,
+      Set<String> aliases, String except) {
+    long total = 0;
+    for (String alias : aliases) {
+      if (alias.equals(except)) {
+        continue;
+      }
+      Long size = aliasToSize.get(alias);
+      if (size == null) {
+        return -1;
+      }
+      total += size;
+    }
+    return total;
+  }
+
   public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
       throws Exception {
     ContentSummary cs = ctx.getCS(dirPath);
@@ -2315,7 +2359,7 @@ public final class Utilities {
       DynamicPartitionCtx dpCtx) throws HiveException {
 
     try {
-      Path loadPath = new Path(dpCtx.getRootPath());
+      Path loadPath = dpCtx.getRootPath();
       FileSystem fs = loadPath.getFileSystem(conf);
       int numDPCols = dpCtx.getNumDPCols();
       FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs);
@@ -2360,16 +2404,15 @@ public final class Utilities {
    * then it returns an MD5 hash of statsPrefix followed by path separator, otherwise
    * it returns statsPrefix
    *
-   * @param statsPrefix
-   * @param maxPrefixLength
-   * @return
+   * @param statsPrefix prefix of stats key
+   * @param maxPrefixLength max length of stats key
+   * @return if the length of prefix is longer than max, return MD5 hashed value of the prefix
    */
-  public static String getHashedStatsPrefix(String statsPrefix,
-      int maxPrefixLength, int postfixLength) {
+  public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLength) {
     // todo: this might return possibly longer prefix than
     // maxPrefixLength (if set) when maxPrefixLength - postfixLength < 17,
     // which would make stat values invalid (especially for 'counter' type)
-    if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength - postfixLength) {
+    if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength) {
       try {
         MessageDigest digester = MessageDigest.getInstance("MD5");
         digester.update(statsPrefix.getBytes());
@@ -2441,28 +2484,12 @@ public final class Utilities {
     jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString);
   }
 
-  public static void validatePartSpecColumnNames(Table tbl, Map<String, String> partSpec)
-      throws SemanticException {
-
-    List<FieldSchema> parts = tbl.getPartitionKeys();
-    Set<String> partCols = new HashSet<String>(parts.size());
-    for (FieldSchema col : parts) {
-      partCols.add(col.getName());
-    }
-    for (String col : partSpec.keySet()) {
-      if (!partCols.contains(col)) {
-        throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg(col));
-      }
-    }
-  }
-
   public static String suffix = ".hashtable";
 
-  public static String generatePath(String baseURI, String dumpFilePrefix,
+  public static Path generatePath(Path basePath, String dumpFilePrefix,
       Byte tag, String bigBucketFileName) {
-    String path = new String(baseURI + Path.SEPARATOR + "MapJoin-" + dumpFilePrefix + tag +
+    return new Path(basePath, "MapJoin-" + dumpFilePrefix + tag +
       "-" + bigBucketFileName + suffix);
-    return path;
   }
 
   public static String generateFileName(Byte tag, String bigBucketFileName) {
@@ -2470,24 +2497,16 @@ public final class Utilities {
     return fileName;
   }
 
-  public static String generateTmpURI(String baseURI, String id) {
-    String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id);
-    return tmpFileURI;
+  public static Path generateTmpPath(Path basePath, String id) {
+    return new Path(basePath, "HashTable-" + id);
   }
 
-  public static String generateTarURI(String baseURI, String filename) {
-    String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz");
-    return tmpFileURI;
-  }
-
-  public static String generateTarURI(Path baseURI, String filename) {
-    String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz");
-    return tmpFileURI;
+  public static Path generateTarPath(Path basePath, String filename) {
+    return new Path(basePath, filename + ".tar.gz");
   }
 
   public static String generateTarFileName(String name) {
-    String tmpFileURI = new String(name + ".tar.gz");
-    return tmpFileURI;
+    return name + ".tar.gz";
   }
 
   public static String generatePath(Path baseURI, String filename) {
@@ -2898,6 +2917,16 @@ public final class Utilities {
   }
 
   /**
+   * On Tez we're not creating dummy files when getting/setting input paths.
+   * We let Tez handle the situation. We're also setting the paths in the AM
+   * so we don't want to depend on scratch dir and context.
+   */
+  public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception {
+    List<Path> paths = getInputPaths(job, work, null, null);
+    return paths;
+  }
+
+  /**
    * Computes a list of all input paths needed to compute the given MapWork. All aliases
    * are considered and a merged list of input paths is returned. If any input path points
    * to an empty table or partition a dummy file in the scratch dir is instead created and
@@ -2911,7 +2940,7 @@ public final class Utilities {
    * @return List of paths to process for the given MapWork
    * @throws Exception
    */
-  public static List<Path> getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
+  public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx)
       throws Exception {
     int sequenceNumber = 0;
 
@@ -2966,7 +2995,7 @@ public final class Utilities {
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private static Path createEmptyFile(String hiveScratchDir,
+  private static Path createEmptyFile(Path hiveScratchDir,
       Class<? extends HiveOutputFormat> outFileFormat, JobConf job,
       int sequenceNumber, Properties props, boolean dummyRow)
           throws IOException, InstantiationException, IllegalAccessException {
@@ -2983,7 +3012,6 @@ public final class Utilities {
     String newFile = newDir + File.separator + "emptyFile";
     Path newFilePath = new Path(newFile);
 
-    String onefile = newPath.toString();
     FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
         Text.class, false, props, null);
     if (dummyRow) {
@@ -2999,7 +3027,7 @@ public final class Utilities {
 
   @SuppressWarnings("rawtypes")
   private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
-      String hiveScratchDir, String alias, int sequenceNumber)
+      Path hiveScratchDir, String alias, int sequenceNumber)
           throws IOException, InstantiationException, IllegalAccessException {
 
     String strPath = path.toString();
@@ -3041,7 +3069,7 @@ public final class Utilities {
 
   @SuppressWarnings("rawtypes")
   private static Path createDummyFileForEmptyTable(JobConf job, MapWork work,
-      String hiveScratchDir, String alias, int sequenceNumber)
+      Path hiveScratchDir, String alias, int sequenceNumber)
           throws IOException, InstantiationException, IllegalAccessException {
 
     TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
@@ -3169,10 +3197,10 @@ public final class Utilities {
 
       if (op instanceof FileSinkOperator) {
         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
-        String tempDir = fdesc.getDirName();
+        Path tempDir = fdesc.getDirName();
 
         if (tempDir != null) {
-          Path tempPath = Utilities.toTempPath(new Path(tempDir));
+          Path tempPath = Utilities.toTempPath(tempDir);
           FileSystem fs = tempPath.getFileSystem(conf);
           fs.mkdirs(tempPath);
         }
@@ -3184,10 +3212,24 @@ public final class Utilities {
     }
   }
 
-  public static void clearWorkMap() {
+  /**
+   * Returns true if a plan is both configured for vectorized execution
+   * and vectorization is allowed. The plan may be configured for vectorization
+   * but vectorization dissalowed eg. for FetchOperator execution. 
+   */
+  public static boolean isVectorMode(Configuration conf) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
+        Utilities.getPlanPath(conf) != null && Utilities
+        .getMapRedWork(conf).getMapWork().getVectorMode()) {
+      return true;
+    }
+    return false;
+  }
+  
+    public static void clearWorkMap() {
     gWorkMap.clear();
   }
-
+  
   /**
    * Create a temp dir in specified baseDir
    * This can go away once hive moves to support only JDK 7

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1561947&r1=1561946&r2=1561947&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Jan 28 05:48:03 2014
@@ -203,7 +203,6 @@ public class ExecDriver extends Task<Map
 
     Context ctx = driverContext.getCtx();
     boolean ctxCreated = false;
-    String emptyScratchDirStr;
     Path emptyScratchDir;
 
     MapWork mWork = work.getMapWork();
@@ -215,8 +214,7 @@ public class ExecDriver extends Task<Map
         ctxCreated = true;
       }
 
-      emptyScratchDirStr = ctx.getMRTmpFileURI();
-      emptyScratchDir = new Path(emptyScratchDirStr);
+      emptyScratchDir = ctx.getMRTmpPath();
       FileSystem fs = emptyScratchDir.getFileSystem(job);
       fs.mkdirs(emptyScratchDir);
     } catch (IOException e) {
@@ -331,8 +329,8 @@ public class ExecDriver extends Task<Map
       MapredLocalWork localwork = mWork.getMapLocalWork();
       if (localwork != null) {
         if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
-          Path localPath = new Path(localwork.getTmpFileURI());
-          Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
+          Path localPath = localwork.getTmpPath();
+          Path hdfsPath = mWork.getTmpHDFSPath();
 
           FileSystem hdfs = hdfsPath.getFileSystem(job);
           FileSystem localFS = localPath.getFileSystem(job);
@@ -345,19 +343,16 @@ public class ExecDriver extends Task<Map
           }
 
           //package and compress all the hashtable files to an archive file
-          String parentDir = localPath.toUri().getPath();
           String stageId = this.getId();
-          String archiveFileURI = Utilities.generateTarURI(parentDir, stageId);
           String archiveFileName = Utilities.generateTarFileName(stageId);
           localwork.setStageID(stageId);
 
-          CompressionUtils.tar(parentDir, fileNames,archiveFileName);
-          Path archivePath = new Path(archiveFileURI);
-          LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archiveFileURI);
+          CompressionUtils.tar(localPath.toUri().getPath(), fileNames,archiveFileName);
+          Path archivePath = Utilities.generateTarPath(localPath, stageId);
+          LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archivePath);
 
           //upload archive file to hdfs
-          String hdfsFile =Utilities.generateTarURI(hdfsPath, stageId);
-          Path hdfsFilePath = new Path(hdfsFile);
+          Path hdfsFilePath =Utilities.generateTarPath(hdfsPath, stageId);
           short replication = (short) job.getInt("mapred.submit.replication", 10);
           hdfs.setReplication(hdfsFilePath, replication);
           hdfs.copyFromLocalFile(archivePath, hdfsFilePath);
@@ -370,10 +365,10 @@ public class ExecDriver extends Task<Map
         }
       }
       work.configureJobConf(job);
-      List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx);
+      List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx);
       Utilities.setInputPaths(job, inputPaths);
 
-      Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+      Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
 
       if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
         try {
@@ -502,8 +497,8 @@ public class ExecDriver extends Task<Map
     for (String path : paths) {
       inputPaths.add(new Path(path));
     }
-    
-    String tmpPath = context.getCtx().getExternalTmpFileURI(inputPaths.get(0).toUri());
+
+    Path tmpPath = context.getCtx().getExternalTmpPath(inputPaths.get(0).toUri());
     Path partitionFile = new Path(tmpPath, ".partitions");
     ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile);
     PartitionKeySampler sampler = new PartitionKeySampler();
@@ -744,7 +739,7 @@ public class ExecDriver extends Task<Map
   public static String generateCmdLine(HiveConf hconf, Context ctx)
       throws IOException {
     HiveConf tempConf = new HiveConf();
-    Path hConfFilePath = new Path(ctx.getLocalTmpFileURI(), JOBCONF_FILENAME);
+    Path hConfFilePath = new Path(ctx.getLocalTmpPath(), JOBCONF_FILENAME);
     OutputStream out = null;
 
     Properties deltaP = hconf.getChangedProperties();



Mime
View raw message