Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C870410759 for ; Tue, 28 Jan 2014 05:49:28 +0000 (UTC) Received: (qmail 39581 invoked by uid 500); 28 Jan 2014 05:49:27 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 39529 invoked by uid 500); 28 Jan 2014 05:49:27 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 39334 invoked by uid 99); 28 Jan 2014 05:49:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jan 2014 05:49:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jan 2014 05:49:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C75252388B34; Tue, 28 Jan 2014 05:48:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1561947 [9/17] - in /hive/branches/tez: ./ ant/ ant/src/org/apache/hadoop/hive/ant/ beeline/ cli/ cli/src/java/org/apache/hadoop/hive/cli/ common/ common/src/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/h... Date: Tue, 28 Jan 2014 05:48:10 -0000 To: commits@hive.apache.org From: gunther@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140128054826.C75252388B34@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Jan 28 05:48:03 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.Ta import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.Hook; import org.apache.hadoop.hive.ql.hooks.HookContext; @@ -101,6 +102,9 @@ import org.apache.hadoop.hive.ql.plan.Op import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; @@ -523,57 +527,62 @@ public class Driver implements CommandPr SessionState ss = SessionState.get(); HiveOperation op = ss.getHiveOperation(); Hive db = sem.getDb(); - if (op != null) { - if (op.equals(HiveOperation.CREATEDATABASE)) { - ss.getAuthorizer().authorize( - op.getInputRequiredPrivileges(), op.getOutputRequiredPrivileges()); - } else if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) - || op.equals(HiveOperation.CREATETABLE)) { - ss.getAuthorizer().authorize( - db.getDatabase(SessionState.get().getCurrentDatabase()), null, - HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges()); - } else { - if (op.equals(HiveOperation.IMPORT)) { - ImportSemanticAnalyzer isa = (ImportSemanticAnalyzer) sem; - if (!isa.existsTable()) { - ss.getAuthorizer().authorize( - db.getDatabase(SessionState.get().getCurrentDatabase()), null, - HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges()); - } + if (ss.isAuthorizationModeV2()) { + doAuthorizationV2(ss, op, inputs, outputs); + return; + } + + if (op == null) { + throw new HiveException("Operation should not be null"); + } + if (op.equals(HiveOperation.CREATEDATABASE)) { + ss.getAuthorizer().authorize( + op.getInputRequiredPrivileges(), op.getOutputRequiredPrivileges()); + } else if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) + || op.equals(HiveOperation.CREATETABLE)) { + ss.getAuthorizer().authorize( + db.getDatabase(SessionState.get().getCurrentDatabase()), null, + HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges()); + } else { + if (op.equals(HiveOperation.IMPORT)) { + ImportSemanticAnalyzer isa = (ImportSemanticAnalyzer) sem; + if (!isa.existsTable()) { + ss.getAuthorizer().authorize( + db.getDatabase(SessionState.get().getCurrentDatabase()), null, + HiveOperation.CREATETABLE_AS_SELECT.getOutputRequiredPrivileges()); } } - if (outputs != null && outputs.size() > 0) { - for (WriteEntity write : outputs) { - if (write.getType() == Entity.Type.DATABASE) { - ss.getAuthorizer().authorize(write.getDatabase(), - null, op.getOutputRequiredPrivileges()); - continue; - } - - if (write.getType() == WriteEntity.Type.PARTITION) { - Partition part = db.getPartition(write.getTable(), write - .getPartition().getSpec(), false); - if (part != null) { - ss.getAuthorizer().authorize(write.getPartition(), null, - op.getOutputRequiredPrivileges()); - continue; - } - } + } + if (outputs != null && outputs.size() > 0) { + for (WriteEntity write : outputs) { + if (write.getType() == Entity.Type.DATABASE) { + ss.getAuthorizer().authorize(write.getDatabase(), + null, op.getOutputRequiredPrivileges()); + continue; + } - if (write.getTable() != null) { - ss.getAuthorizer().authorize(write.getTable(), null, + if (write.getType() == WriteEntity.Type.PARTITION) { + Partition part = db.getPartition(write.getTable(), write + .getPartition().getSpec(), false); + if (part != null) { + ss.getAuthorizer().authorize(write.getPartition(), null, op.getOutputRequiredPrivileges()); + continue; } } + if (write.getTable() != null) { + ss.getAuthorizer().authorize(write.getTable(), null, + op.getOutputRequiredPrivileges()); + } } } if (inputs != null && inputs.size() > 0) { - Map> tab2Cols = new HashMap>(); Map> part2Cols = new HashMap>(); + //determine if partition level privileges should be checked for input tables Map tableUsePartLevelAuth = new HashMap(); for (ReadEntity read : inputs) { if (read.getType() == Entity.Type.DATABASE) { @@ -596,6 +605,8 @@ public class Driver implements CommandPr } } + //for a select or create-as-select query, populate the partition to column (par2Cols) or + // table to columns mapping (tab2Cols) if (op.equals(HiveOperation.CREATETABLE_AS_SELECT) || op.equals(HiveOperation.QUERY)) { SemanticAnalyzer querySem = (SemanticAnalyzer) sem; @@ -691,6 +702,49 @@ public class Driver implements CommandPr } } + private void doAuthorizationV2(SessionState ss, HiveOperation op, HashSet inputs, + HashSet outputs) { + HiveOperationType hiveOpType = getHiveOperationType(op); + List inputsHObjs = getHivePrivObjects(inputs); + List outputHObjs = getHivePrivObjects(outputs); + ss.getAuthorizerV2().checkPrivileges(hiveOpType, inputsHObjs, outputHObjs); + return; + } + + private List getHivePrivObjects(HashSet inputs) { + List hivePrivobjs = new ArrayList(); + for(Entity input : inputs){ + HivePrivilegeObjectType privObjType = getHivePrivilegeObjectType(input.getType()); + //support for authorization on partitions or uri needs to be added + HivePrivilegeObject hPrivObject = new HivePrivilegeObject(privObjType, + input.getDatabase().getName(), + input.getTable().getTableName()); + hivePrivobjs.add(hPrivObject); + } + return hivePrivobjs; + } + + private HivePrivilegeObjectType getHivePrivilegeObjectType(Type type) { + switch(type){ + case DATABASE: + return HivePrivilegeObjectType.DATABASE; + case TABLE: + return HivePrivilegeObjectType.TABLE; + case LOCAL_DIR: + case DFS_DIR: + return HivePrivilegeObjectType.URI; + case PARTITION: + case DUMMYPARTITION: //need to determine if a different type is needed for dummy partitions + return HivePrivilegeObjectType.PARTITION; + default: + return null; + } + } + + private HiveOperationType getHiveOperationType(HiveOperation op) { + return HiveOperationType.valueOf(op.name()); + } + /** * @return The current query plan associated with this Driver, if any. */ @@ -1234,6 +1288,8 @@ public class Driver implements CommandPr Map running = new HashMap(); DriverContext driverCxt = new DriverContext(runnable, ctx); + driverCxt.prepare(plan); + ctx.setHDFSCleanup(true); SessionState.get().setLastMapRedStatsList(new ArrayList()); @@ -1313,6 +1369,8 @@ public class Driver implements CommandPr } } + driverCxt.finished(tskRun); + if (SessionState.get() != null) { SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(), Keys.TASK_RET_CODE, String.valueOf(exitVal)); @@ -1474,6 +1532,8 @@ public class Driver implements CommandPr TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); + cxt.prepare(tskRun); + // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { // Launch it in the parallel mode, as a separate thread only for MR tasks Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java Tue Jan 28 05:48:03 2014 @@ -18,13 +18,25 @@ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.NodeUtils; +import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.StatsTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; + import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Queue; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.mapred.JobConf; - /** * DriverContext. * @@ -38,6 +50,8 @@ public class DriverContext { Context ctx; + final Map statsTasks = new HashMap(1); + public DriverContext() { this.runnable = null; this.ctx = null; @@ -82,5 +96,42 @@ public class DriverContext { public void incCurJobNo(int amount) { this.curJobNo = this.curJobNo + amount; } - + + public void prepare(QueryPlan plan) { + // extract stats keys from StatsTask + List> rootTasks = plan.getRootTasks(); + NodeUtils.iterateTask(rootTasks, StatsTask.class, new Function() { + public void apply(StatsTask statsTask) { + statsTasks.put(statsTask.getWork().getAggKey(), statsTask); + } + }); + } + + public void prepare(TaskRunner runner) { + } + + public void finished(TaskRunner runner) { + if (statsTasks.isEmpty() || !(runner.getTask() instanceof MapRedTask)) { + return; + } + MapRedTask mapredTask = (MapRedTask) runner.getTask(); + + MapWork mapWork = mapredTask.getWork().getMapWork(); + ReduceWork reduceWork = mapredTask.getWork().getReduceWork(); + List operators = new ArrayList(mapWork.getAliasToWork().values()); + if (reduceWork != null) { + operators.add(reduceWork.getReducer()); + } + final List statKeys = new ArrayList(1); + NodeUtils.iterate(operators, FileSinkOperator.class, new Function() { + public void apply(FileSinkOperator fsOp) { + if (fsOp.getConf().isGatherStats()) { + statKeys.add(fsOp.getConf().getStatsAggPrefix()); + } + } + }); + for (String statKey : statKeys) { + statsTasks.get(statKey).getWork().setSourceTask(mapredTask); + } + } } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Jan 28 05:48:03 2014 @@ -45,7 +45,7 @@ public enum ErrorMsg { // 30000 to 39999: Runtime errors which Hive thinks may be transient and retrying may succeed. // 40000 to 49999: Errors where Hive is unable to advise about retries. // In addition to the error code, ErrorMsg also has a SQLState field. - // SQLStates are taken from Section 12.5 of ISO-9075. + // SQLStates are taken from Section 22.1 of ISO-9075. // See http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt // Most will just rollup to the generic syntax error state of 42000, but // specific errors can override the that state. @@ -53,6 +53,7 @@ public enum ErrorMsg { // http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-error-sqlstates.html GENERIC_ERROR(40000, "Exception while processing"), + //========================== 10000 range starts here ========================// INVALID_TABLE(10001, "Table not found", "42S02"), INVALID_COLUMN(10002, "Invalid column reference"), INVALID_INDEX(10003, "Invalid index"), @@ -370,7 +371,11 @@ public enum ErrorMsg { INVALID_DIR(10252, "{0} is not a directory", true), NO_VALID_LOCATIONS(10253, "Could not find any valid location to place the jars. " + "Please update hive.jar.directory or hive.user.install.directory with a valid location", false), + UNNSUPPORTED_AUTHORIZATION_PRINCIPAL_TYPE_GROUP(10254, + "Principal type GROUP is not supported in this authorization setting", "28000"), + INVALID_TABLE_NAME(10255, "Invalid table name {0}", true), + //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " + "It may have crashed with an error."), @@ -382,6 +387,7 @@ public enum ErrorMsg { "tried to create too many dynamic partitions. The maximum number of dynamic partitions " + "is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "), + //========================== 30000 range starts here ========================// STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " + "There was a error to retrieve the StatsPublisher, and retrying " + "might help. If you dont want the query to fail because accurate statistics " + Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ArchiveUtils.java Tue Jan 28 05:48:03 2014 @@ -108,11 +108,11 @@ public final class ArchiveUtils { } catch (MetaException e) { throw new HiveException("Unable to get partitions directories prefix", e); } - URI tableDir = tbl.getDataLocation(); + Path tableDir = tbl.getDataLocation(); if(tableDir == null) { throw new HiveException("Table has no location set"); } - return new Path(tableDir.toString(), prefixSubdir); + return new Path(tableDir, prefixSubdir); } /** * Generates name for prefix partial partition specification. Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Jan 28 05:48:03 2014 @@ -22,7 +22,6 @@ import static org.apache.commons.lang.St import static org.apache.hadoop.util.StringUtils.stringifyException; import java.io.BufferedWriter; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -157,6 +156,12 @@ import org.apache.hadoop.hive.ql.plan.Un import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.security.authorization.Privilege; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrincipal.HivePrincipalType; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilege; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; @@ -302,7 +307,7 @@ public class DDLTask extends Task principals = grantOrRevokeRoleDDL.getPrincipalDesc(); List roles = grantOrRevokeRoleDDL.getRoles(); + + if(SessionState.get().isAuthorizationModeV2()){ + return grantOrRevokeRoleV2(grantOrRevokeRoleDDL); + } + for (PrincipalDesc principal : principals) { String userName = principal.getName(); for (String roleName : roles) { @@ -507,6 +518,28 @@ public class DDLTask extends Task hivePrincipals = getHivePrincipals(grantOrRevokeRoleDDL.getPrincipalDesc()); + List roles = grantOrRevokeRoleDDL.getRoles(); + + if(grantOrRevokeRoleDDL.getGrant()){ + authorizer.grantRole(hivePrincipals, roles, + grantOrRevokeRoleDDL.isGrantOption(), grantorPrinc); + } + else{ + authorizer.revokeRole(hivePrincipals, roles, + grantOrRevokeRoleDDL.isGrantOption(), grantorPrinc); + } + return 0; + } + private int showGrants(ShowGrantDesc showGrantDesc) throws HiveException { StringBuilder builder = new StringBuilder(); try { @@ -514,6 +547,7 @@ public class DDLTask extends Task users = db.showPrivilegeGrant( HiveObjectType.GLOBAL, principalName, principalDesc.getType(), null, null, null, null); @@ -619,7 +653,14 @@ public class DDLTask extends Task principals, List privileges, PrivilegeObjectDesc privSubjectDesc, - String grantor, PrincipalType grantorType, boolean grantOption, boolean isGrant) { + String grantor, PrincipalType grantorType, boolean grantOption, boolean isGrant) + throws HiveException { + + if(SessionState.get().isAuthorizationModeV2()){ + return grantOrRevokePrivilegesV2(principals, privileges, privSubjectDesc, grantor, + grantorType, grantOption, isGrant); + } + if (privileges == null || privileges.size() == 0) { console.printError("No privilege found."); return 1; @@ -638,15 +679,12 @@ public class DDLTask extends Task principals, + List privileges, PrivilegeObjectDesc privSubjectDesc, String grantor, + PrincipalType grantorType, boolean grantOption, boolean isGrant) throws HiveException { + HiveAuthorizer authorizer = SessionState.get().getAuthorizerV2(); + + //Convert to object types used by the authorization plugin interface + List hivePrincipals = getHivePrincipals(principals); + List hivePrivileges = getHivePrivileges(privileges); + HivePrivilegeObject hivePrivObject = getHivePrivilegeObject(privSubjectDesc); + HivePrincipal grantorPrincipal = new HivePrincipal(grantor, getHivePrincipalType(grantorType)); + + if(isGrant){ + authorizer.grantPrivileges(hivePrincipals, hivePrivileges, hivePrivObject, + grantorPrincipal, grantOption); + }else { + authorizer.revokePrivileges(hivePrincipals, hivePrivileges, + hivePrivObject, grantorPrincipal, grantOption); + } + //no exception thrown, so looks good + return 0; + } + + private HivePrivilegeObject getHivePrivilegeObject(PrivilegeObjectDesc privSubjectDesc) + throws HiveException { + String [] dbTable = Utilities.getDbTableName(privSubjectDesc.getObject()); + return new HivePrivilegeObject(getPrivObjectType(privSubjectDesc), dbTable[0], dbTable[1]); + } + + private HivePrivilegeObjectType getPrivObjectType(PrivilegeObjectDesc privSubjectDesc) { + //TODO: This needs to change to support view once view grant/revoke is supported as + // part of HIVE-6181 + return privSubjectDesc.getTable() ? HivePrivilegeObjectType.TABLE : HivePrivilegeObjectType.DATABASE; + } + + private List getHivePrivileges(List privileges) { + List hivePrivileges = new ArrayList(); + for(PrivilegeDesc privilege : privileges){ + hivePrivileges.add( + new HivePrivilege(privilege.getPrivilege().toString(), privilege.getColumns())); + } + return hivePrivileges; + } + + private List getHivePrincipals(List principals) throws HiveException { + ArrayList hivePrincipals = new ArrayList(); + for(PrincipalDesc principal : principals){ + hivePrincipals.add( + new HivePrincipal(principal.getName(), getHivePrincipalType(principal.getType()))); + } + return hivePrincipals; + } + + private HivePrincipalType getHivePrincipalType(PrincipalType type) throws HiveException { + switch(type){ + case USER: + return HivePrincipalType.USER; + case ROLE: + return HivePrincipalType.ROLE; + case GROUP: + throw new HiveException(ErrorMsg.UNNSUPPORTED_AUTHORIZATION_PRINCIPAL_TYPE_GROUP); + default: + //should not happen as we take care of all existing types + throw new HiveException("Unsupported authorization type specified"); + } + } + private void throwNotFound(String objType, String objName) throws HiveException { throw new HiveException(objType + " " + objName + " not found"); } - private int roleDDL(RoleDDLDesc roleDDLDesc) { + private int roleDDL(RoleDDLDesc roleDDLDesc) throws HiveException, IOException { + if(SessionState.get().isAuthorizationModeV2()){ + return roleDDLV2(roleDDLDesc); + } + + DataOutputStream outStream = null; RoleDDLDesc.RoleOperation operation = roleDDLDesc.getOperation(); - DataOutput outStream = null; try { if (operation.equals(RoleDDLDesc.RoleOperation.CREATE_ROLE)) { db.createRole(roleDDLDesc.getName(), roleDDLDesc.getRoleOwnerName()); @@ -780,9 +888,20 @@ public class DDLTask extends Task roleNames = db.getAllRoleNames(); + Path resFile = new Path(roleDDLDesc.getResFile()); + FileSystem fs = resFile.getFileSystem(conf); + outStream = fs.create(resFile); + for (String roleName : roleNames) { + outStream.writeBytes(roleName); + outStream.write(terminator); + } + outStream.close(); + outStream = null; } else { throw new HiveException("Unkown role operation " + operation.getOperationName()); @@ -802,6 +921,48 @@ public class DDLTask extends Task roles = authorizer.getRoles(new HivePrincipal(roleDDLDesc.getName(), + getHivePrincipalType(roleDDLDesc.getPrincipalType()))); + writeListToFile(roles, roleDDLDesc.getResFile()); + break; + case SHOW_ROLES: + List allRoles = authorizer.getAllRoles(); + writeListToFile(allRoles, roleDDLDesc.getResFile()); + break; + default: + throw new HiveException("Unkown role operation " + + operation.getOperationName()); + } + return 0; + } + + /** + * Write list of string entries into given file + * @param entries + * @param resFile + * @throws IOException + */ + private void writeListToFile(List entries, String resFile) throws IOException { + StringBuilder sb = new StringBuilder(entries.size()*2); + for(String entry : entries){ + sb.append(entry); + sb.append(terminator); + } + writeToFile(sb.toString(), resFile); + } + private int alterDatabase(AlterDatabaseDesc alterDbDesc) throws HiveException { String dbName = alterDbDesc.getDatabaseName(); @@ -883,8 +1044,8 @@ public class DDLTask extends Task parts = db.createPartitions(addPartitionDesc); + for (Partition part : parts) { + work.getOutputs().add(new WriteEntity(part)); } - - - - if (addPartitionDesc.getLocation() == null) { - db.createPartition(tbl, addPartitionDesc.getPartSpec(), null, - addPartitionDesc.getPartParams(), - addPartitionDesc.getInputFormat(), - addPartitionDesc.getOutputFormat(), - addPartitionDesc.getNumBuckets(), - addPartitionDesc.getCols(), - addPartitionDesc.getSerializationLib(), - addPartitionDesc.getSerdeParams(), - addPartitionDesc.getBucketCols(), - addPartitionDesc.getSortCols()); - - } else { - if (tbl.isView()) { - throw new HiveException("LOCATION clause illegal for view partition"); - } - // set partition path relative to table - db.createPartition(tbl, addPartitionDesc.getPartSpec(), new Path(tbl - .getPath(), addPartitionDesc.getLocation()), addPartitionDesc.getPartParams(), - addPartitionDesc.getInputFormat(), - addPartitionDesc.getOutputFormat(), - addPartitionDesc.getNumBuckets(), - addPartitionDesc.getCols(), - addPartitionDesc.getSerializationLib(), - addPartitionDesc.getSerdeParams(), - addPartitionDesc.getBucketCols(), - addPartitionDesc.getSortCols()); - } - - Partition part = db - .getPartition(tbl, addPartitionDesc.getPartSpec(), false); - work.getOutputs().add(new WriteEntity(part)); - return 0; } @@ -1212,12 +1332,12 @@ public class DDLTask extends Task duplicateProps = new ArrayList(); try { Path resFile = new Path(showCreateTbl.getResFile()); @@ -1880,7 +1993,7 @@ public class DDLTask extends Task fNames = getFunctionNames(); - for(String fName : fNames) - { - FunctionInfo fInfo = getFunctionInfo(fName); - if ( fInfo.isGenericUDAF()) - { - WindowFunctionInfo wInfo = new WindowFunctionInfo(fInfo); - windowFunctions.put(fName, wInfo); - } - } + static private void addFunctionInfoToWindowFunctions(String functionName, + FunctionInfo functionInfo) { + // Assumes that the caller has already verified that functionInfo is for an aggregate function + WindowFunctionInfo wInfo = new WindowFunctionInfo(functionInfo); + windowFunctions.put(functionName.toLowerCase(), wInfo); } public static boolean isTableFunction(String name) Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Tue Jan 28 05:48:03 2014 @@ -265,9 +265,9 @@ public class HashTableSinkOperator exten public void closeOp(boolean abort) throws HiveException { try { if (mapJoinTables != null) { - // get tmp file URI - String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI(); - LOG.info("Temp URI for side table: " + tmpURI); + // get tmp path + Path tmpPath = this.getExecContext().getLocalWork().getTmpPath(); + LOG.info("Temp URI for side table: " + tmpPath); for (byte tag = 0; tag < mapJoinTables.length; tag++) { // get the key and value MapJoinTableContainer tableContainer = mapJoinTables[tag]; @@ -279,10 +279,9 @@ public class HashTableSinkOperator exten String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName); // get the tmp URI path; it will be a hdfs path if not local mode String dumpFilePrefix = conf.getDumpFilePrefix(); - String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); - console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath); + Path path = Utilities.generatePath(tmpPath, dumpFilePrefix, tag, fileName); + console.printInfo(Utilities.now() + "\tDump the side-table into file: " + path); // get the hashtable file and path - Path path = new Path(tmpURIPath); FileSystem fs = path.getFileSystem(hconf); ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096)); try { @@ -291,7 +290,7 @@ public class HashTableSinkOperator exten out.close(); } tableContainer.clear(); - console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath); + console.printInfo(Utilities.now() + "\tUpload 1 File to: " + path); } } super.closeOp(abort); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Tue Jan 28 05:48:03 2014 @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; @@ -150,7 +149,7 @@ public class JoinOperator extends Common if (conf.getHandleSkewJoin()) { try { for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); mvFileToFinalPath(specPath, hconf, success, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -165,7 +164,7 @@ public class JoinOperator extends Common if (success) { // move up files for (int i = 0; i < numAliases; i++) { - String specPath = conf.getBigKeysDirMap().get((byte) i); + Path specPath = conf.getBigKeysDirMap().get((byte) i); moveUpFiles(specPath, hconf, LOG); for (int j = 0; j < numAliases; j++) { if (j == i) { @@ -184,16 +183,15 @@ public class JoinOperator extends Common super.jobCloseOp(hconf, success); } - private void moveUpFiles(String specPath, Configuration hconf, Log log) + private void moveUpFiles(Path specPath, Configuration hconf, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); - Path finalPath = new Path(specPath); + FileSystem fs = specPath.getFileSystem(hconf); - if (fs.exists(finalPath)) { - FileStatus[] taskOutputDirs = fs.listStatus(finalPath); + if (fs.exists(specPath)) { + FileStatus[] taskOutputDirs = fs.listStatus(specPath); if (taskOutputDirs != null) { for (FileStatus dir : taskOutputDirs) { - Utilities.renameOrMoveFiles(fs, dir.getPath(), finalPath); + Utilities.renameOrMoveFiles(fs, dir.getPath(), specPath); fs.delete(dir.getPath(), true); } } @@ -210,15 +208,13 @@ public class JoinOperator extends Common * @throws IOException * @throws HiveException */ - private void mvFileToFinalPath(String specPath, Configuration hconf, + private void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate"); - Path finalPath = new Path(specPath); - ArrayList emptyBuckets = null; if (success) { if (fs.exists(tmpPath)) { // Step1: rename tmp output folder to intermediate path. After this @@ -229,8 +225,8 @@ public class JoinOperator extends Common // Step2: remove any tmp file or double-committed output files Utilities.removeTempOrDuplicateFiles(fs, intermediatePath); // Step3: move to the file destination - log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath); + log.info("Moving tmp dir: " + intermediatePath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, intermediatePath, specPath); } } else { fs.delete(tmpPath, true); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Jan 28 05:48:03 2014 @@ -251,7 +251,7 @@ public class MoveTask extends Task files; FileSystem fs; try { - fs = FileSystem.get(table.getDataLocation(), conf); + fs = table.getDataLocation().getFileSystem(conf); dirs = fs.globStatus(tbd.getSourcePath()); files = new ArrayList(); for (int i = 0; (dirs != null && i < dirs.length); i++) { @@ -460,9 +460,9 @@ public class MoveTask extends Task(SelectDesc.class, VectorSelectOperator.class)); vectorOpvec.add(new OpTuple(GroupByDesc.class, VectorGroupByOperator.class)); vectorOpvec.add(new OpTuple(MapJoinDesc.class, VectorMapJoinOperator.class)); + vectorOpvec.add(new OpTuple(SMBJoinDesc.class, VectorSMBMapJoinOperator.class)); vectorOpvec.add(new OpTuple(ReduceSinkDesc.class, VectorReduceSinkOperator.class)); vectorOpvec.add(new OpTuple(FileSinkDesc.class, VectorFileSinkOperator.class)); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Jan 28 05:48:03 2014 @@ -226,6 +226,11 @@ public class SMBMapJoinOperator extends public void cleanUpInputFileChangedOp() throws HiveException { inputFileChanged = true; } + + protected List smbJoinComputeKeys(Object row, byte alias) throws HiveException { + return JoinUtil.computeKeys(row, joinKeys[alias], + joinKeysObjectInspectors[alias]); + } @Override public void processOp(Object row, int tag) throws HiveException { @@ -260,8 +265,8 @@ public class SMBMapJoinOperator extends byte alias = (byte) tag; // compute keys and values as StandardObjects - ArrayList key = JoinUtil.computeKeys(row, joinKeys[alias], - joinKeysObjectInspectors[alias]); + List key = smbJoinComputeKeys(row, alias); + List value = getFilteredValue(alias, row); @@ -495,7 +500,7 @@ public class SMBMapJoinOperator extends return smallestOne == null ? null : result; } - private boolean processKey(byte alias, ArrayList key) + private boolean processKey(byte alias, List key) throws HiveException { List keyWritable = keyWritables[alias]; if (keyWritable == null) { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Tue Jan 28 05:48:03 2014 @@ -177,7 +177,7 @@ public class SkewJoinHandler { void endGroup() throws IOException, HiveException { if (skewKeyInCurrentGroup) { - String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); + Path specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag); RowContainer> bigKey = (RowContainer)joinOp.storage[currBigKeyTag]; Path outputPath = getOperatorOutputPath(specPath); FileSystem destFs = outputPath.getFileSystem(hconf); @@ -258,7 +258,7 @@ public class SkewJoinHandler { } try { - String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); + Path specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl); Path bigKeyPath = getOperatorOutputPath(specPath); FileSystem fs = bigKeyPath.getFileSystem(hconf); delete(bigKeyPath, fs); @@ -295,7 +295,7 @@ public class SkewJoinHandler { continue; } - String specPath = conf.getBigKeysDirMap().get( + Path specPath = conf.getBigKeysDirMap().get( Byte.valueOf((byte) bigKeyTbl)); commitOutputPathToFinalPath(specPath, false); for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) { @@ -311,7 +311,7 @@ public class SkewJoinHandler { } } - private void commitOutputPathToFinalPath(String specPath, + private void commitOutputPathToFinalPath(Path specPath, boolean ignoreNonExisting) throws IOException { Path outPath = getOperatorOutputPath(specPath); Path finalPath = getOperatorFinalPath(specPath); @@ -334,14 +334,12 @@ public class SkewJoinHandler { } } - private Path getOperatorOutputPath(String specPath) throws IOException { - Path tmpPath = Utilities.toTempPath(specPath); - return new Path(tmpPath, Utilities.toTempPath(taskId)); + private Path getOperatorOutputPath(Path specPath) throws IOException { + return new Path(Utilities.toTempPath(specPath), Utilities.toTempPath(taskId)); } - private Path getOperatorFinalPath(String specPath) throws IOException { - Path tmpPath = Utilities.toTempPath(specPath); - return new Path(tmpPath, taskId); + private Path getOperatorFinalPath(Path specPath) throws IOException { + return new Path(Utilities.toTempPath(specPath), taskId); } public void setSkewJoinJobCounter(LongWritable skewjoinFollowupJobs) { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Jan 28 05:48:03 2014 @@ -171,9 +171,7 @@ public class StatsTask extends Task parameters, String aggKey, boolean atomic) throws HiveException { + Map parameters, String prefix, int maxPrefixLength, boolean atomic) + throws HiveException { + + String aggKey = Utilities.getHashedStatsPrefix(prefix, maxPrefixLength); for (String statType : StatsSetupConst.statsRequireCompute) { String value = statsAggregator.aggregateStats(aggKey, statType); @@ -317,6 +316,7 @@ public class StatsTask extends Task parameters, Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Jan 28 05:48:03 2014 @@ -291,14 +291,11 @@ public class TableScanOperator extends O statsToPublish.clear(); String prefix = Utilities.join(conf.getStatsAggPrefix(), pspecs); - String key; int maxKeyLength = conf.getMaxStatsKeyPrefixLength(); - if (statsPublisher instanceof CounterStatsPublisher) { - key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, 0); - } else { + String key = Utilities.getHashedStatsPrefix(prefix, maxKeyLength); + if (!(statsPublisher instanceof CounterStatsPublisher)) { // stats publisher except counter type needs postfix 'taskID' - prefix = Utilities.getHashedStatsPrefix(prefix, maxKeyLength, taskID.length()); - key = prefix + taskID; + key = Utilities.join(prefix, taskID); } for(String statType : stats.get(pspecs).getStoredStats()) { statsToPublish.put(statType, Long.toString(stats.get(pspecs).getStat(statType))); Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Jan 28 05:48:03 2014 @@ -70,6 +70,10 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -95,7 +99,6 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -133,12 +136,9 @@ import org.apache.hadoop.hive.ql.metadat import org.apache.hadoop.hive.ql.metadata.InputEstimator; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; -import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -184,8 +184,6 @@ import com.esotericsoftware.kryo.io.Inpu import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; -import org.apache.commons.codec.binary.Base64; - /** * Utilities. * @@ -324,6 +322,7 @@ public final class Utilities { } byte[] planBytes = Base64.decodeBase64(planString); in = new ByteArrayInputStream(planBytes); + in = new InflaterInputStream(in); } else { in = new FileInputStream(localPath.toUri().getPath()); } @@ -554,22 +553,31 @@ public final class Utilities { } } - public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) { + public static class PathDelegate extends PersistenceDelegate { + @Override + protected Expression instantiate(Object oldInstance, Encoder out) { + Path p = (Path)oldInstance; + Object[] args = {p.toString()}; + return new Expression(p, p.getClass(), "new", args); + } + } + + public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { setMapWork(conf, w.getMapWork(), hiveScratchDir, true); if (w.getReduceWork() != null) { setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true); } } - public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) { + public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) { return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache); } - public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) { + public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) { return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); } - private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) { + private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { try { setPlanPath(conf, hiveScratchDir); @@ -579,11 +587,12 @@ public final class Utilities { if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { // add it to the conf - out = new ByteArrayOutputStream(); + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); serializePlan(w, out, conf); LOG.info("Setting plan: "+planPath.toUri().getPath()); conf.set(planPath.toUri().getPath(), - Base64.encodeBase64String(((ByteArrayOutputStream)out).toByteArray())); + Base64.encodeBase64String(byteOut.toByteArray())); } else { // use the default file system of the conf FileSystem fs = planPath.getFileSystem(conf); @@ -626,7 +635,7 @@ public final class Utilities { return new Path(planPath, name); } - private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException { + private static void setPlanPath(Configuration conf, Path hiveScratchDir) throws IOException { if (getPlanPath(conf) == null) { // this is the unique conf ID, which is kept in JobConf as part of the plan file name String jobID = UUID.randomUUID().toString(); @@ -814,8 +823,9 @@ public final class Utilities { /** * Deserializes the plan. * @param in The stream to read from. + * @param planClass class of plan + * @param conf configuration * @return The plan, such as QueryPlan, MapredWork, etc. - * @param To know what serialization format plan is in */ public static T deserializePlan(InputStream in, Class planClass, Configuration conf) { return deserializePlan(in, planClass, conf, false); @@ -860,6 +870,7 @@ public final class Utilities { e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate()); e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate()); e.setPersistenceDelegate(CommonToken.class, new CommonTokenDelegate()); + e.setPersistenceDelegate(Path.class, new PathDelegate()); e.writeObject(plan); e.close(); @@ -1420,10 +1431,6 @@ public final class Utilities { return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); } - public static Path toTaskTempPath(String orig) { - return toTaskTempPath(new Path(orig)); - } - public static Path toTempPath(Path orig) { if (orig.getName().indexOf(tmpPrefix) == 0) { return orig; @@ -1682,15 +1689,14 @@ public final class Utilities { } } - public static void mvFileToFinalPath(String specPath, Configuration hconf, + public static void mvFileToFinalPath(Path specPath, Configuration hconf, boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter) throws IOException, HiveException { - FileSystem fs = (new Path(specPath)).getFileSystem(hconf); + FileSystem fs = specPath.getFileSystem(hconf); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); - Path finalPath = new Path(specPath); if (success) { if (fs.exists(tmpPath)) { // remove any tmp file or double-committed output files @@ -1702,8 +1708,8 @@ public final class Utilities { } // move to the file destination - log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath); - Utilities.renameOrMoveFiles(fs, tmpPath, finalPath); + log.info("Moving tmp dir: " + tmpPath + " to: " + specPath); + Utilities.renameOrMoveFiles(fs, tmpPath, specPath); } } else { fs.delete(tmpPath, true); @@ -2004,6 +2010,25 @@ public final class Utilities { return names; } + /** + * Extract db and table name from dbtable string, where db and table are separated by "." + * If there is no db name part, set the current sessions default db + * @param dbtable + * @return String array with two elements, first is db name, second is table name + * @throws HiveException + */ + public static String[] getDbTableName(String dbtable) throws HiveException{ + String[] names = dbtable.split("\\."); + switch (names.length) { + case 2: + return names; + case 1: + return new String [] {SessionState.get().getCurrentDatabase(), dbtable}; + default: + throw new HiveException(ErrorMsg.INVALID_TABLE_NAME, dbtable); + } + } + public static void validateColumnNames(List colNames, List checkCols) throws SemanticException { Iterator checkColsIter = checkCols.iterator(); @@ -2181,6 +2206,8 @@ public final class Utilities { } resultMap.put(pathStr, new ContentSummary(total, -1, -1)); } + // todo: should nullify summary for non-native tables, + // not to be selected as a mapjoin target FileSystem fs = p.getFileSystem(myConf); resultMap.put(pathStr, fs.getContentSummary(p)); } catch (Exception e) { @@ -2242,6 +2269,23 @@ public final class Utilities { } } + // return sum of lengths except one alias. returns -1 if any of other alias is unknown + public static long sumOfExcept(Map aliasToSize, + Set aliases, String except) { + long total = 0; + for (String alias : aliases) { + if (alias.equals(except)) { + continue; + } + Long size = aliasToSize.get(alias); + if (size == null) { + return -1; + } + total += size; + } + return total; + } + public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) throws Exception { ContentSummary cs = ctx.getCS(dirPath); @@ -2315,7 +2359,7 @@ public final class Utilities { DynamicPartitionCtx dpCtx) throws HiveException { try { - Path loadPath = new Path(dpCtx.getRootPath()); + Path loadPath = dpCtx.getRootPath(); FileSystem fs = loadPath.getFileSystem(conf); int numDPCols = dpCtx.getNumDPCols(); FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs); @@ -2360,16 +2404,15 @@ public final class Utilities { * then it returns an MD5 hash of statsPrefix followed by path separator, otherwise * it returns statsPrefix * - * @param statsPrefix - * @param maxPrefixLength - * @return + * @param statsPrefix prefix of stats key + * @param maxPrefixLength max length of stats key + * @return if the length of prefix is longer than max, return MD5 hashed value of the prefix */ - public static String getHashedStatsPrefix(String statsPrefix, - int maxPrefixLength, int postfixLength) { + public static String getHashedStatsPrefix(String statsPrefix, int maxPrefixLength) { // todo: this might return possibly longer prefix than // maxPrefixLength (if set) when maxPrefixLength - postfixLength < 17, // which would make stat values invalid (especially for 'counter' type) - if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength - postfixLength) { + if (maxPrefixLength >= 0 && statsPrefix.length() > maxPrefixLength) { try { MessageDigest digester = MessageDigest.getInstance("MD5"); digester.update(statsPrefix.getBytes()); @@ -2441,28 +2484,12 @@ public final class Utilities { jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString); } - public static void validatePartSpecColumnNames(Table tbl, Map partSpec) - throws SemanticException { - - List parts = tbl.getPartitionKeys(); - Set partCols = new HashSet(parts.size()); - for (FieldSchema col : parts) { - partCols.add(col.getName()); - } - for (String col : partSpec.keySet()) { - if (!partCols.contains(col)) { - throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg(col)); - } - } - } - public static String suffix = ".hashtable"; - public static String generatePath(String baseURI, String dumpFilePrefix, + public static Path generatePath(Path basePath, String dumpFilePrefix, Byte tag, String bigBucketFileName) { - String path = new String(baseURI + Path.SEPARATOR + "MapJoin-" + dumpFilePrefix + tag + + return new Path(basePath, "MapJoin-" + dumpFilePrefix + tag + "-" + bigBucketFileName + suffix); - return path; } public static String generateFileName(Byte tag, String bigBucketFileName) { @@ -2470,24 +2497,16 @@ public final class Utilities { return fileName; } - public static String generateTmpURI(String baseURI, String id) { - String tmpFileURI = new String(baseURI + Path.SEPARATOR + "HashTable-" + id); - return tmpFileURI; + public static Path generateTmpPath(Path basePath, String id) { + return new Path(basePath, "HashTable-" + id); } - public static String generateTarURI(String baseURI, String filename) { - String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz"); - return tmpFileURI; - } - - public static String generateTarURI(Path baseURI, String filename) { - String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz"); - return tmpFileURI; + public static Path generateTarPath(Path basePath, String filename) { + return new Path(basePath, filename + ".tar.gz"); } public static String generateTarFileName(String name) { - String tmpFileURI = new String(name + ".tar.gz"); - return tmpFileURI; + return name + ".tar.gz"; } public static String generatePath(Path baseURI, String filename) { @@ -2898,6 +2917,16 @@ public final class Utilities { } /** + * On Tez we're not creating dummy files when getting/setting input paths. + * We let Tez handle the situation. We're also setting the paths in the AM + * so we don't want to depend on scratch dir and context. + */ + public static List getInputPathsTez(JobConf job, MapWork work) throws Exception { + List paths = getInputPaths(job, work, null, null); + return paths; + } + + /** * Computes a list of all input paths needed to compute the given MapWork. All aliases * are considered and a merged list of input paths is returned. If any input path points * to an empty table or partition a dummy file in the scratch dir is instead created and @@ -2911,7 +2940,7 @@ public final class Utilities { * @return List of paths to process for the given MapWork * @throws Exception */ - public static List getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx) + public static List getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, Context ctx) throws Exception { int sequenceNumber = 0; @@ -2966,7 +2995,7 @@ public final class Utilities { } @SuppressWarnings({"rawtypes", "unchecked"}) - private static Path createEmptyFile(String hiveScratchDir, + private static Path createEmptyFile(Path hiveScratchDir, Class outFileFormat, JobConf job, int sequenceNumber, Properties props, boolean dummyRow) throws IOException, InstantiationException, IllegalAccessException { @@ -2983,7 +3012,6 @@ public final class Utilities { String newFile = newDir + File.separator + "emptyFile"; Path newFilePath = new Path(newFile); - String onefile = newPath.toString(); FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, props, null); if (dummyRow) { @@ -2999,7 +3027,7 @@ public final class Utilities { @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work, - String hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir, String alias, int sequenceNumber) throws IOException, InstantiationException, IllegalAccessException { String strPath = path.toString(); @@ -3041,7 +3069,7 @@ public final class Utilities { @SuppressWarnings("rawtypes") private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, - String hiveScratchDir, String alias, int sequenceNumber) + Path hiveScratchDir, String alias, int sequenceNumber) throws IOException, InstantiationException, IllegalAccessException { TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); @@ -3169,10 +3197,10 @@ public final class Utilities { if (op instanceof FileSinkOperator) { FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); - String tempDir = fdesc.getDirName(); + Path tempDir = fdesc.getDirName(); if (tempDir != null) { - Path tempPath = Utilities.toTempPath(new Path(tempDir)); + Path tempPath = Utilities.toTempPath(tempDir); FileSystem fs = tempPath.getFileSystem(conf); fs.mkdirs(tempPath); } @@ -3184,10 +3212,24 @@ public final class Utilities { } } - public static void clearWorkMap() { + /** + * Returns true if a plan is both configured for vectorized execution + * and vectorization is allowed. The plan may be configured for vectorization + * but vectorization dissalowed eg. for FetchOperator execution. + */ + public static boolean isVectorMode(Configuration conf) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && + Utilities.getPlanPath(conf) != null && Utilities + .getMapRedWork(conf).getMapWork().getVectorMode()) { + return true; + } + return false; + } + + public static void clearWorkMap() { gWorkMap.clear(); } - + /** * Create a temp dir in specified baseDir * This can go away once hive moves to support only JDK 7 Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1561947&r1=1561946&r2=1561947&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Jan 28 05:48:03 2014 @@ -203,7 +203,6 @@ public class ExecDriver extends Task inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx); + List inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx); Utilities.setInputPaths(job, inputPaths); - Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); + Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) { try { @@ -502,8 +497,8 @@ public class ExecDriver extends Task