From hcatalog-commits-return-410-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Fri Jul 22 23:38:49 2011 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 689077937 for ; Fri, 22 Jul 2011 23:38:49 +0000 (UTC) Received: (qmail 50001 invoked by uid 500); 22 Jul 2011 23:38:49 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 49963 invoked by uid 500); 22 Jul 2011 23:38:49 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 49956 invoked by uid 99); 22 Jul 2011 23:38:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2011 23:38:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,WEIRD_QUOTING X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2011 23:38:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 51CF62388901; Fri, 22 Jul 2011 23:38:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1149763 [1/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/har/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/test... Date: Fri, 22 Jul 2011 23:38:14 -0000 To: hcatalog-commits@incubator.apache.org From: khorgath@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110722233815.51CF62388901@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: khorgath Date: Fri Jul 22 23:38:07 2011 New Revision: 1149763 URL: http://svn.apache.org/viewvc?rev=1149763&view=rev Log: HCATALOG-42 Dynamic Partitioning Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Modified: incubator/hcatalog/trunk/build.xml incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Modified: incubator/hcatalog/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/build.xml (original) +++ incubator/hcatalog/trunk/build.xml Fri Jul 22 23:38:07 2011 @@ -107,6 +107,7 @@ + Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Fri Jul 22 23:38:07 2011 @@ -41,7 +41,7 @@ public enum ErrorType { ERROR_INVALID_PARTITION_VALUES (2010, "Invalid partition values specified"), ERROR_MISSING_PARTITION_KEY (2011, "Partition key value not provided for publish"), ERROR_MOVE_FAILED (2012, "Moving of data failed during commit"), - + ERROR_TOO_MANY_DYNAMIC_PTNS (2013, "Attempt to create too many dynamic partitions"), /* Authorization Errors 3000 - 3999 */ ERROR_ACCESS_CONTROL (3000, "Permission denied"), Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Fri Jul 22 23:38:07 2011 @@ -64,13 +64,18 @@ public final class HCatConstants { public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info"; public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf"; public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig"; - + public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig"; + public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform"; + public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq"; public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration"; public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name"; public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix"; + + public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid"; + public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; // Message Bus related properties. public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat"; Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Fri Jul 22 23:38:07 2011 @@ -28,21 +28,42 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; public class HCatUtil { +// static final private Log LOG = LogFactory.getLog(HCatUtil.class); + public static boolean checkJobContextIfRunningFromBackend(JobContext j){ if (j.getConfiguration().get("mapred.task.id", "").equals("")){ return false; @@ -256,4 +277,102 @@ public class HCatUtil { return true; } + public static Token getJobTrackerDelegationToken(Configuration conf, String userName) throws Exception { +// LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")"); + JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class)); + Token t = jcl.getDelegationToken(new Text(userName)); +// LOG.info("got "+t); + return t; + +// return null; + } + + public static void cancelJobTrackerDelegationToken(String tokenStrForm, String tokenSignature) throws Exception { +// LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")"); + JobClient jcl = new JobClient(new JobConf(new Configuration(), HCatOutputFormat.class)); + Token t = extractJobTrackerToken(tokenStrForm,tokenSignature); +// LOG.info("canceling "+t); + try { + jcl.cancelDelegationToken(t); + }catch(Exception e){ +// HCatUtil.logToken(LOG, "jcl token to cancel", t); + // ignore if token has already been invalidated. + } + } + + + public static Token + extractThriftToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException { +// LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")"); + Token t = new Token(); + t.decodeFromUrlString(tokenStrForm); + t.setService(new Text(tokenSignature)); +// LOG.info("returning "+t); + return t; + } + + public static Token + extractJobTrackerToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException { +// LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")"); + Token t = + new Token(); + t.decodeFromUrlString(tokenStrForm); + t.setService(new Text(tokenSignature)); +// LOG.info("returning "+t); + return t; + } + + /** + * Logging stack trace + * @param logger + */ + public static void logStackTrace(Log logger) { + StackTraceElement[] stackTrace = new Exception().getStackTrace(); + for (int i = 1 ; i < stackTrace.length ; i++){ + logger.info("\t"+stackTrace[i].toString()); + } + } + + /** + * debug log the hive conf + * @param logger + * @param hc + */ + public static void logHiveConf(Log logger, HiveConf hc){ + logEntrySet(logger,"logging hiveconf:",hc.getAllProperties().entrySet()); + } + + + public static void logList(Log logger, String itemName, List list){ + logger.info(itemName+":"); + for (Object item : list){ + logger.info("\t["+item+"]"); + } + } + + public static void logMap(Log logger, String itemName, Map map){ + logEntrySet(logger,itemName,map.entrySet()); + } + + public static void logEntrySet(Log logger, String itemName, Set entrySet) { + logger.info(itemName+":"); + for (Entry e : entrySet){ + logger.info("\t["+e.getKey()+"]=>["+e.getValue()+"]"); + } + } + + public static void logAllTokens(Log logger, JobContext context) throws IOException { + for (Tokent : context.getCredentials().getAllTokens()){ + logToken(logger,"token",t); + } + } + + public static void logToken(Log logger, String itemName, Token t) throws IOException { + logger.info(itemName+":"); + logger.info("\tencodeToUrlString : "+t.encodeToUrlString()); + logger.info("\ttoString : "+t.toString()); + logger.info("\tkind : "+t.getKind()); + logger.info("\tservice : "+t.getService()); + } + } Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1149763&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (added) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Fri Jul 22 23:38:07 2011 @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.har; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.tools.HadoopArchives; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; + +public class HarOutputCommitterPostProcessor { + +// static final private Log LOG = LogFactory.getLog(HarOutputCommitterPostProcessor.class); + + boolean isEnabled = false; + + public boolean isEnabled() { + return isEnabled; + } + + public void setEnabled(boolean enabled) { + this.isEnabled = enabled; + } + + + public void exec(JobContext context, Partition partition, Path partPath) throws IOException { +// LOG.info("Archiving partition ["+partPath.toString()+"]"); + makeHar(context,partPath.toUri().toString(),harFile(partPath)); + partition.getParameters().put(Constants.IS_ARCHIVED, "true"); + } + + public String harFile(Path ptnPath) throws IOException{ + String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har"; +// LOG.info("har file : " + harFile); + return harFile; + } + + public String getParentFSPath(Path ptnPath) throws IOException { + return ptnPath.toUri().getPath().replaceFirst("/+$", ""); + } + + public String getProcessedLocation(Path ptnPath) throws IOException { + String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR; +// LOG.info("har location : " + harLocn); + return harLocn; + } + + + /** + * Creates a har file from the contents of a given directory, using that as root. + * @param dir Directory to archive + * @param harName The HAR file to create + */ + public static void makeHar(JobContext context, String dir, String harFile) throws IOException{ +// Configuration conf = context.getConfiguration(); +// Credentials creds = context.getCredentials(); + +// HCatUtil.logAllTokens(LOG,context); + + int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR); + Path archivePath = new Path(harFile.substring(0,lastSep)); + final String[] args = { + "-archiveName", + harFile.substring(lastSep+1, harFile.length()), + "-p", + dir, + "*", + archivePath.toString() + }; +// for (String arg : args){ +// LOG.info("Args to har : "+ arg); +// } + try { + Configuration newConf = new Configuration(); + FileSystem fs = archivePath.getFileSystem(newConf); + + newConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); +// LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+ System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]"); + +// for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){ +// LOG.info("src : "+ds.getPath().toUri().toString()); +// } + + final HadoopArchives har = new HadoopArchives(newConf); + int rc = ToolRunner.run(har, args); + if (rc!= 0){ + throw new Exception("Har returned error code "+rc); + } + +// for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){ +// LOG.info("dest : "+hs.getPath().toUri().toString()); +// } +// doHarCheck(fs,harFile); +// LOG.info("Nuking " + dir); + fs.delete(new Path(dir), true); + } catch (Exception e){ + throw new HCatException("Error creating Har ["+harFile+"] from ["+dir+"]", e); + } + } + +} Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java Fri Jul 22 23:38:07 2011 @@ -34,7 +34,7 @@ public abstract class HCatBaseOutputComm /** The underlying output committer */ protected final OutputCommitter baseCommitter; - public HCatBaseOutputCommitter(OutputCommitter baseCommitter) { + public HCatBaseOutputCommitter(JobContext context, OutputCommitter baseCommitter) { this.baseCommitter = baseCommitter; } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -20,9 +20,14 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; @@ -36,6 +41,8 @@ import org.apache.hcatalog.data.schema.H public abstract class HCatBaseOutputFormat extends OutputFormat, HCatRecord> { +// static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class); + /** * Gets the table schema for the table specified in the HCatOutputFormat.setOutput call * on the specified job context. @@ -83,7 +90,7 @@ public abstract class HCatBaseOutputForm * @return the OutputJobInfo object * @throws IOException the IO exception */ - static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { + public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); if( jobString == null ) { throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED); @@ -102,33 +109,107 @@ public abstract class HCatBaseOutputForm @SuppressWarnings("unchecked") static HCatOutputStorageDriver getOutputDriverInstance( JobContext jobContext, OutputJobInfo jobInfo) throws IOException { + return getOutputDriverInstance(jobContext,jobInfo,(List)null); + } + + /** + * Gets the output storage driver instance, with allowing specification of missing dynamic partvals + * @param jobContext the job context + * @param jobInfo the output job info + * @return the output driver instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + static HCatOutputStorageDriver getOutputDriverInstance( + JobContext jobContext, OutputJobInfo jobInfo, List dynamicPartVals) throws IOException { try { Class driverClass = (Class) Class.forName(jobInfo.getStorerInfo().getOutputSDClass()); HCatOutputStorageDriver driver = driverClass.newInstance(); + Map partitionValues = jobInfo.getTableInfo().getPartitionValues(); + String location = jobInfo.getLocation(); + + if (dynamicPartVals != null){ + // dynamic part vals specified + List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + if (dynamicPartVals.size() != dynamicPartKeys.size()){ + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Unable to instantiate dynamic partitioning storage driver, mismatch between" + + " number of partition values obtained["+dynamicPartVals.size() + + "] and number of partition values required["+dynamicPartKeys.size()+"]"); + } + for (int i = 0; i < dynamicPartKeys.size(); i++){ + partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i)); + } + + // re-home location, now that we know the rest of the partvals + Table table = jobInfo.getTable(); + + List partitionCols = new ArrayList(); + for(FieldSchema schema : table.getPartitionKeys()) { + partitionCols.add(schema.getName()); + } + + location = driver.getOutputLocation(jobContext, + table.getSd().getLocation() , partitionCols, + partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); + } + //Initialize the storage driver driver.setSchema(jobContext, jobInfo.getOutputSchema()); - driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues()); - driver.setOutputPath(jobContext, jobInfo.getLocation()); + driver.setPartitionValues(jobContext, partitionValues); + driver.setOutputPath(jobContext, location); + +// HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues); driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties()); return driver; } catch(Exception e) { + if (e instanceof HCatException){ + throw (HCatException)e; + }else{ throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e); + } } } + /** + * Gets the output storage driver instance, with allowing specification + * of partvals from which it picks the dynamic partvals + * @param jobContext the job context + * @param jobInfo the output job info + * @return the output driver instance + * @throws IOException + */ + + protected static HCatOutputStorageDriver getOutputDriverInstance( + JobContext context, OutputJobInfo jobInfo, + Map fullPartSpec) throws IOException { + List dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys(); + if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){ + return getOutputDriverInstance(context,jobInfo,(List)null); + }else{ + List dynKeyVals = new ArrayList(); + for (String dynamicPartKey : dynamicPartKeys){ + dynKeyVals.add(fullPartSpec.get(dynamicPartKey)); + } + return getOutputDriverInstance(context,jobInfo,dynKeyVals); + } + } + + protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema, Map partMap) throws HCatException, IOException { List posOfPartCols = new ArrayList(); + List posOfDynPartCols = new ArrayList(); // If partition columns occur in data, we want to remove them. // So, find out positions of partition columns in schema provided by user. // We also need to update the output Schema with these deletions. - + // Note that, output storage drivers never sees partition columns in data // or schema. @@ -140,8 +221,26 @@ public abstract class HCatBaseOutputForm schemaWithoutParts.remove(schema.get(partKey)); } } + + // Also, if dynamic partitioning is being used, we want to + // set appropriate list of columns for the columns to be dynamically specified. + // These would be partition keys too, so would also need to be removed from + // output schema and partcols + + if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){ + for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){ + Integer idx; + if((idx = schema.getPosition(partKey)) != null){ + posOfPartCols.add(idx); + posOfDynPartCols.add(idx); + schemaWithoutParts.remove(schema.get(partKey)); + } + } + } + HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts); jobInfo.setPosOfPartCols(posOfPartCols); + jobInfo.setPosOfDynPartCols(posOfDynPartCols); jobInfo.setOutputSchema(schemaWithoutParts); } } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Fri Jul 22 23:38:07 2011 @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.E import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; @@ -47,8 +48,8 @@ public class HCatEximOutputCommitter ext private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class); - public HCatEximOutputCommitter(OutputCommitter baseCommitter) { - super(baseCommitter); + public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) { + super(context,baseCommitter); } @Override Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -91,7 +91,7 @@ public class HCatEximOutputFormat extend @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); - return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context)); + return new HCatEximOutputCommitter(context,outputFormat.getOutputCommitter(context)); } public static void setOutput(Job job, String dbname, String tablename, String location, Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Fri Jul 22 23:38:07 2011 @@ -21,26 +21,35 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; @@ -49,49 +58,105 @@ import org.apache.hcatalog.common.HCatUt import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; import org.apache.thrift.TException; public class HCatOutputCommitter extends OutputCommitter { +// static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class); + /** The underlying output committer */ private final OutputCommitter baseCommitter; - public HCatOutputCommitter(OutputCommitter baseCommitter) { + private final boolean dynamicPartitioningUsed; + private boolean partitionsDiscovered; + + private Map> partitionsDiscoveredByPath; + private Map storageDriversDiscoveredByPath; + + HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor(); + + private String ptnRootLocation = null; + + public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException { + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + if (!dynamicPartitioningUsed){ this.baseCommitter = baseCommitter; + this.partitionsDiscovered = true; + }else{ + this.baseCommitter = null; + this.partitionsDiscovered = false; + } } @Override public void abortTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.abortTask(context); + } } @Override public void commitTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.commitTask(context); + }else{ + // called explicitly through HCatRecordWriter.close() if dynamic + } } @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ return baseCommitter.needsTaskCommit(context); + }else{ + // called explicitly through HCatRecordWriter.close() if dynamic - return false by default + return false; + } } @Override public void setupJob(JobContext context) throws IOException { - if( baseCommitter != null ) { - baseCommitter.setupJob(context); - } + if( baseCommitter != null ) { + baseCommitter.setupJob(context); + } + // in dynamic usecase, called through HCatRecordWriter } @Override public void setupTask(TaskAttemptContext context) throws IOException { + if (!dynamicPartitioningUsed){ baseCommitter.setupTask(context); - } + }else{ + // called explicitly through HCatRecordWriter.write() if dynamic + } + } @Override public void abortJob(JobContext jobContext, State state) throws IOException { + + if (dynamicPartitioningUsed){ + discoverPartitions(jobContext); + } + if(baseCommitter != null) { baseCommitter.abortJob(jobContext, state); + }else{ + if (dynamicPartitioningUsed){ + for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + try { + baseOsd.abortOutputCommitterJob( + new TaskAttemptContext( + jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) + ),state); + } catch (Exception e) { + throw new IOException(e); + } + } + } } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); try { @@ -106,6 +171,13 @@ public class HCatOutputCommitter extends (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } + + String jcTokenStrForm = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + String jcTokenSignature = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + if(jcTokenStrForm != null && jcTokenSignature != null) { + HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + } + } catch(Exception e) { if( e instanceof HCatException ) { throw (HCatException) e; @@ -114,8 +186,16 @@ public class HCatOutputCommitter extends } } - Path src = new Path(jobInfo.getLocation()); + Path src; + if (dynamicPartitioningUsed){ + src = new Path(getPartitionRootLocation( + jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize() + )); + }else{ + src = new Path(jobInfo.getLocation()); + } FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); +// LOG.warn("abortJob about to delete ["+src.toString() +"]"); fs.delete(src, true); } @@ -130,6 +210,10 @@ public class HCatOutputCommitter extends @Override public void commitJob(JobContext jobContext) throws IOException { + if (dynamicPartitioningUsed){ + discoverPartitions(jobContext); + } + if(baseCommitter != null) { baseCommitter.commitJob(jobContext); } @@ -153,12 +237,15 @@ public class HCatOutputCommitter extends @Override public void cleanupJob(JobContext context) throws IOException { + if (dynamicPartitioningUsed){ + discoverPartitions(context); + } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); Configuration conf = context.getConfiguration(); Table table = jobInfo.getTable(); - StorageDescriptor tblSD = table.getSd(); - Path tblPath = new Path(tblSD.getLocation()); + Path tblPath = new Path(table.getSd().getLocation()); FileSystem fs = tblPath.getFileSystem(conf); if( table.getPartitionKeys().size() == 0 ) { @@ -166,75 +253,116 @@ public class HCatOutputCommitter extends if( baseCommitter != null ) { baseCommitter.cleanupJob(context); + }else{ + if (dynamicPartitioningUsed){ + for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ + try { + baseOsd.cleanupOutputCommitterJob( + new TaskAttemptContext( + context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) + )); + } catch (Exception e) { + throw new IOException(e); + } + } + } } - + //Move data from temp directory the actual table directory //No metastore operation required. Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath); + moveTaskOutputs(fs, src, src, tblPath,false); fs.delete(src, true); return; } HiveMetaStoreClient client = null; List values = null; - boolean partitionAdded = false; HCatTableInfo tableInfo = jobInfo.getTableInfo(); + List partitionsAdded = new ArrayList(); + try { client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf); StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters()); - Partition partition = new Partition(); - partition.setDbName(tableInfo.getDatabaseName()); - partition.setTableName(tableInfo.getTableName()); - partition.setSd(new StorageDescriptor(tblSD)); - partition.getSd().setLocation(jobInfo.getLocation()); - updateTableSchema(client, table, jobInfo.getOutputSchema()); + + FileStatus tblStat = fs.getFileStatus(tblPath); + String grpName = tblStat.getGroup(); + FsPermission perms = tblStat.getPermission(); - List fields = new ArrayList(); - for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) { - fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema)); + List partitionsToAdd = new ArrayList(); + if (!dynamicPartitioningUsed){ + partitionsToAdd.add( + constructPartition( + context, + tblPath.toString(), tableInfo.getPartitionValues() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + }else{ + for (Entry> entry : partitionsDiscoveredByPath.entrySet()){ + partitionsToAdd.add( + constructPartition( + context, + getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue() + ,jobInfo.getOutputSchema(), getStorerParameterMap(storer) + ,table, fs + ,grpName,perms)); + } } - partition.getSd().setCols(fields); - - Map partKVs = tableInfo.getPartitionValues(); - //Get partition value list - partition.setValues(getPartitionValueList(table,partKVs)); - - Map params = new HashMap(); - params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass()); - params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass()); + //Publish the new partition(s) + if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + + Path src = new Path(ptnRootLocation); + + // check here for each dir we're copying out, to see if it already exists, error out if so + moveTaskOutputs(fs, src, src, tblPath,true); + + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); + + +// for (Partition partition : partitionsToAdd){ +// partitionsAdded.add(client.add_partition(partition)); +// // currently following add_partition instead of add_partitions because latter isn't +// // all-or-nothing and we want to be able to roll back partitions we added if need be. +// } - //Copy table level hcat.* keys to the partition - for(Map.Entry entry : storer.getProperties().entrySet()) { - params.put(entry.getKey().toString(), entry.getValue().toString()); - } + try { + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + } catch (Exception e){ + // There was an error adding partitions : rollback fs copy and rethrow + for (Partition p : partitionsToAdd){ + Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation()))); + if (fs.exists(ptnPath)){ + fs.delete(ptnPath,true); + } + } + throw e; + } - partition.setParameters(params); + }else{ + // no harProcessor, regular operation - // Sets permissions and group name on partition dirs. - FileStatus tblStat = fs.getFileStatus(tblPath); - String grpName = tblStat.getGroup(); - FsPermission perms = tblStat.getPermission(); - Path partPath = tblPath; - for(FieldSchema partKey : table.getPartitionKeys()){ - partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); - fs.setPermission(partPath, perms); - try{ - fs.setOwner(partPath, null, grpName); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in Hcatalog. + // No duplicate partition publish case to worry about because we'll + // get a AlreadyExistsException here if so, and appropriately rollback + + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + + if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){ + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath,false); + fs.delete(src, true); } + } - - //Publish the new partition - client.add_partition(partition); - partitionAdded = true; //publish to metastore done - + if( baseCommitter != null ) { baseCommitter.cleanupJob(context); } @@ -247,13 +375,24 @@ public class HCatOutputCommitter extends (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) { client.cancelDelegationToken(tokenStrForm); } + + String jcTokenStrForm = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + String jcTokenSignature = + context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + if(jcTokenStrForm != null && jcTokenSignature != null) { + HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature); + } + } catch (Exception e) { - if( partitionAdded ) { + if( partitionsAdded.size() > 0 ) { try { //baseCommitter.cleanupJob failed, try to clean up the metastore + for (Partition p : partitionsAdded){ client.dropPartition(tableInfo.getDatabaseName(), - tableInfo.getTableName(), values); + tableInfo.getTableName(), p.getValues()); + } } catch(Exception te) { //Keep cause as the original exception throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); @@ -272,6 +411,114 @@ public class HCatOutputCommitter extends } } + private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) { + if (ptnRootLocation == null){ + // we only need to calculate it once, it'll be the same for other partitions in this job. + Path ptnRoot = new Path(ptnLocn); + for (int i = 0; i < numPtnKeys; i++){ +// LOG.info("Getting parent of "+ptnRoot.getName()); + ptnRoot = ptnRoot.getParent(); + } + ptnRootLocation = ptnRoot.toString(); + } +// LOG.info("Returning final parent : "+ptnRootLocation); + return ptnRootLocation; + } + + /** + * Generate partition metadata object to be used to add to metadata. + * @param partLocnRoot The table-equivalent location root of the partition + * (temporary dir if dynamic partition, table dir if static) + * @param partKVs The keyvalue pairs that form the partition + * @param outputSchema The output schema for the partition + * @param params The parameters to store inside the partition + * @param table The Table metadata object under which this Partition will reside + * @param fs FileSystem object to operate on the underlying filesystem + * @param grpName Group name that owns the table dir + * @param perms FsPermission that's the default permission of the table dir. + * @return Constructed Partition metadata object + * @throws IOException + */ + + private Partition constructPartition( + JobContext context, + String partLocnRoot, Map partKVs, + HCatSchema outputSchema, Map params, + Table table, FileSystem fs, + String grpName, FsPermission perms) throws IOException { + + StorageDescriptor tblSD = table.getSd(); + + Partition partition = new Partition(); + partition.setDbName(table.getDbName()); + partition.setTableName(table.getTableName()); + partition.setSd(new StorageDescriptor(tblSD)); + + List fields = new ArrayList(); + for(HCatFieldSchema fieldSchema : outputSchema.getFields()) { + fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema)); + } + + partition.getSd().setCols(fields); + + partition.setValues(getPartitionValueList(table,partKVs)); + + partition.setParameters(params); + + // Sets permissions and group name on partition dirs. + + Path partPath = new Path(partLocnRoot); + for(FieldSchema partKey : table.getPartitionKeys()){ + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); +// LOG.info("Setting perms for "+partPath.toString()); + fs.setPermission(partPath, perms); + try{ + fs.setOwner(partPath, null, grpName); + } catch(AccessControlException ace){ + // log the messages before ignoring. Currently, logging is not built in Hcatalog. +// LOG.warn(ace); + } + } + if (dynamicPartitioningUsed){ + String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs); + if (harProcessor.isEnabled()){ + harProcessor.exec(context, partition, partPath); + partition.getSd().setLocation( + harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination))); + }else{ + partition.getSd().setLocation(dynamicPartitionDestination); + } + }else{ + partition.getSd().setLocation(partPath.toString()); + } + + return partition; + } + + + + private String getFinalDynamicPartitionDestination(Table table, Map partKVs) { + // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA -> + // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA + Path partPath = new Path(table.getSd().getLocation()); + for(FieldSchema partKey : table.getPartitionKeys()){ + partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); + } + return partPath.toString(); + } + + private Map getStorerParameterMap(StorerInfo storer) { + Map params = new HashMap(); + params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass()); + params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass()); + + //Copy table level hcat.* keys to the partition + for(Map.Entry entry : storer.getProperties().entrySet()) { + params.put(entry.getKey().toString(), entry.getValue().toString()); + } + return params; + } + private Path constructPartialPartPath(Path partialPath, String partKey, Map partKVs){ StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey)); @@ -344,31 +591,42 @@ public class HCatOutputCommitter extends * @param file the file to move * @param src the source directory * @param dest the target directory + * @param dryRun - a flag that simply tests if this move would succeed or not based + * on whether other files exist where we're trying to copy * @throws IOException */ private void moveTaskOutputs(FileSystem fs, Path file, Path src, - Path dest) throws IOException { + Path dest, boolean dryRun) throws IOException { if (fs.isFile(file)) { Path finalOutputPath = getFinalPath(file, src, dest); - if (!fs.rename(file, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); + if (dryRun){ +// LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem"); + if (fs.exists(finalOutputPath)){ + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible."); } + }else{ +// LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]"); if (!fs.rename(file, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + if (!fs.delete(finalOutputPath, true)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); + } + if (!fs.rename(file, finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest); + } } } } else if(fs.getFileStatus(file).isDir()) { FileStatus[] paths = fs.listStatus(file); Path finalOutputPath = getFinalPath(file, src, dest); - fs.mkdirs(finalOutputPath); - + if (!dryRun){ + fs.mkdirs(finalOutputPath); + } if (paths != null) { for (FileStatus path : paths) { - moveTaskOutputs(fs, path.getPath(), src, dest); + moveTaskOutputs(fs, path.getPath(), src, dest,dryRun); } } } @@ -398,4 +656,72 @@ public class HCatOutputCommitter extends } } + /** + * Run to discover dynamic partitions available + */ + private void discoverPartitions(JobContext context) throws IOException { + if (!partitionsDiscovered){ + // LOG.info("discover ptns called"); + + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + + harProcessor.setEnabled(jobInfo.getHarRequested()); + + List dynamicPartCols = jobInfo.getPosOfDynPartCols(); + int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); + + Path loadPath = new Path(jobInfo.getLocation()); + FileSystem fs = loadPath.getFileSystem(context.getConfiguration()); + + // construct a path pattern (e.g., /*/*) to find all dynamically generated paths + + String dynPathSpec = loadPath.toUri().getPath(); + dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*"); + // TODO : replace this with a param pull from HiveConf + + // LOG.info("Searching for "+dynPathSpec); + Path pathPattern = new Path(loadPath, dynPathSpec); + FileStatus[] status = fs.globStatus(pathPattern); + + partitionsDiscoveredByPath = new LinkedHashMap>(); + storageDriversDiscoveredByPath = new LinkedHashMap(); + + + if (status.length == 0) { + // LOG.warn("No partition found genereated by dynamic partitioning in [" + // +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize() + // +"], dynSpec["+dynPathSpec+"]"); + }else{ + if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){ + this.partitionsDiscovered = true; + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } + + for (FileStatus st : status){ + LinkedHashMap fullPartSpec = new LinkedHashMap(); + Warehouse.makeSpecFromName(fullPartSpec, st.getPath()); + partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec); + storageDriversDiscoveredByPath.put(st.getPath().toString(), + HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec)); + } + } + + // for (Entry> spec : partitionsDiscoveredByPath.entrySet()){ + // LOG.info("Partition "+ spec.getKey()); + // for (Entry e : spec.getValue().entrySet()){ + // LOG.info(e.getKey() + "=>" +e.getValue()); + // } + // } + + this.partitionsDiscovered = true; + } + } + + } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -21,11 +21,14 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +57,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; @@ -66,9 +70,15 @@ import org.apache.thrift.TException; * and should be given as null. The value is the HCatRecord to write.*/ public class HCatOutputFormat extends HCatBaseOutputFormat { +// static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class); + /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; - private static Map> tokenMap = new HashMap>(); + + /** */ + protected static final String DYNTEMP_DIR_NAME = "_DYN"; + + private static Map> tokenMap = new HashMap>(); private static final PathFilter hiddenFileFilter = new PathFilter(){ public boolean accept(Path p){ @@ -76,6 +86,9 @@ public class HCatOutputFormat extends HC return !name.startsWith("_") && !name.startsWith("."); } }; + + private static int maxDynamicPartitions; + private static boolean harRequested; /** * Set the info about the output to write for the Job. This queries the metadata server @@ -90,17 +103,58 @@ public class HCatOutputFormat extends HC try { - Configuration conf = job.getConfiguration(); + Configuration conf = job.getConfiguration(); client = createHiveClient(outputInfo.getServerUri(), conf); Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName()); - if( outputInfo.getPartitionValues() == null ) { + if (table.getPartitionKeysSize() == 0 ){ + if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){ + // attempt made to save partition values in non-partitioned table - throw error. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, + "Partition values specified for non-partitioned table"); + } + // non-partitioned table outputInfo.setPartitionValues(new HashMap()); + } else { - //Convert user specified map to have lower case key names + // partitioned table, we expect partition values + // convert user specified map to have lower case key names Map valueMap = new HashMap(); - for(Map.Entry entry : outputInfo.getPartitionValues().entrySet()) { - valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); + if (outputInfo.getPartitionValues() != null){ + for(Map.Entry entry : outputInfo.getPartitionValues().entrySet()) { + valueMap.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } + + if ( + (outputInfo.getPartitionValues() == null) + || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize()) + ){ + // dynamic partition usecase - partition values were null, or not all were specified + // need to figure out which keys are not specified. + List dynamicPartitioningKeys = new ArrayList(); + boolean firstItem = true; + for (FieldSchema fs : table.getPartitionKeys()){ + if (!valueMap.containsKey(fs.getName().toLowerCase())){ + dynamicPartitioningKeys.add(fs.getName().toLowerCase()); + } + } + + if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){ + // If this isn't equal, then bogus key values have been inserted, error out. + throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified"); + } + + outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys); + String dynHash; + if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){ + dynHash = String.valueOf(Math.random()); +// LOG.info("New dynHash : ["+dynHash+"]"); +// }else{ +// LOG.info("Old dynHash : ["+dynHash+"]"); + } + conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); + } outputInfo.setPartitionValues(valueMap); @@ -125,11 +179,13 @@ public class HCatOutputFormat extends HC String tblLocation = tblSD.getLocation(); String location = driver.getOutputLocation(job, tblLocation, partitionCols, - outputInfo.getPartitionValues()); + outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)); //Serialize the output info into the configuration OutputJobInfo jobInfo = new OutputJobInfo(outputInfo, tableSchema, tableSchema, storerInfo, location, table); + jobInfo.setHarRequested(harRequested); + jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions); conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); Path tblPath = new Path(tblLocation); @@ -176,6 +232,7 @@ public class HCatOutputFormat extends HC // TableInfo, we can have as many tokens as there are stores and the TokenSelector // will correctly pick the right tokens which the committer will use and // cancel. + String tokenSignature = getTokenSignature(outputInfo); if(tokenMap.get(tokenSignature) == null) { // get delegation tokens from hcat server and store them into the "job" @@ -183,19 +240,32 @@ public class HCatOutputFormat extends HC // hcat // when the JobTracker in Hadoop MapReduce starts supporting renewal of // arbitrary tokens, the renewer should be the principal of the JobTracker - String tokenStrForm = client.getDelegationToken(ugi.getUserName()); - Token t = new Token(); - t.decodeFromUrlString(tokenStrForm); - t.setService(new Text(tokenSignature)); - tokenMap.put(tokenSignature, t); + tokenMap.put(tokenSignature, HCatUtil.extractThriftToken( + client.getDelegationToken(ugi.getUserName()), + tokenSignature)); + } + + String jcTokenSignature = "jc."+tokenSignature; + if(tokenMap.get(jcTokenSignature) == null) { + tokenMap.put(jcTokenSignature, + HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName())); } + job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature), tokenMap.get(tokenSignature)); // this will be used by the outputcommitter to pass on to the metastore client // which in turn will pass on to the TokenSelector so that it can select // the right token. + job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature), + tokenMap.get(jcTokenSignature)); + job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature); - } + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature); + job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, tokenMap.get(jcTokenSignature).encodeToUrlString()); + +// LOG.info("Set hive dt["+tokenSignature+"]"); +// LOG.info("Set jt dt["+jcTokenSignature+"]"); + } } } catch(Exception e) { if( e instanceof HCatException ) { @@ -207,10 +277,10 @@ public class HCatOutputFormat extends HC if( client != null ) { client.close(); } +// HCatUtil.logAllTokens(LOG,job); } } - // a signature string to associate with a HCatTableInfo - essentially // a concatenation of dbname, tablename and partition keyvalues. private static String getTokenSignature(HCatTableInfo outputInfo) { @@ -232,11 +302,10 @@ public class HCatOutputFormat extends HC return result.toString(); } - - /** * Handles duplicate publish of partition. Fails if partition already exists. * For non partitioned tables, fails if files are present in table directory. + * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time * @param job the job * @param outputInfo the output info * @param client the metastore client @@ -247,18 +316,33 @@ public class HCatOutputFormat extends HC */ private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo, HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException { - List partitionValues = HCatOutputCommitter.getPartitionValueList( - table, outputInfo.getPartitionValues()); + + /* + * For fully specified ptn, follow strict checks for existence of partitions in metadata + * For unpartitioned tables, follow filechecks + * For partially specified tables: + * This would then need filechecks at the start of a ptn write, + * Doing metadata checks can get potentially very expensive (fat conf) if + * there are a large number of partitions that match the partial specifications + */ if( table.getPartitionKeys().size() > 0 ) { - //For partitioned table, fail if partition is already present - List currentParts = client.listPartitionNames(outputInfo.getDatabaseName(), - outputInfo.getTableName(), partitionValues, (short) 1); + if (!outputInfo.isDynamicPartitioningUsed()){ + List partitionValues = HCatOutputCommitter.getPartitionValueList( + table, outputInfo.getPartitionValues()); + // fully-specified partition + List currentParts = client.listPartitionNames(outputInfo.getDatabaseName(), + outputInfo.getTableName(), partitionValues, (short) 1); - if( currentParts.size() > 0 ) { - throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); + if( currentParts.size() > 0 ) { + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); + } } } else { + List partitionValues = HCatOutputCommitter.getPartitionValueList( + table, outputInfo.getPartitionValues()); + // non-partitioned table + Path tablePath = new Path(table.getSd().getLocation()); FileSystem fs = tablePath.getFileSystem(job.getConfiguration()); @@ -299,24 +383,12 @@ public class HCatOutputFormat extends HC getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { - // First create the RW. HCatRecordWriter rw = new HCatRecordWriter(context); - - // Now set permissions and group on freshly created files. - OutputJobInfo info = getJobInfo(context); - Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation()); - Path tblPath = new Path(info.getTable().getSd().getLocation()); - FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); - FileStatus tblPathStat = fs.getFileStatus(tblPath); - fs.setPermission(workFile, tblPathStat.getPermission()); - try{ - fs.setOwner(workFile, null, tblPathStat.getGroup()); - } catch(AccessControlException ace){ - // log the messages before ignoring. Currently, logging is not built in HCat. - } + rw.prepareForStorageDriverOutput(context); return rw; } + /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. @@ -329,10 +401,17 @@ public class HCatOutputFormat extends HC public OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException { OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); - return new HCatOutputCommitter(outputFormat.getOutputCommitter(context)); + return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context)); } static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { + HiveConf hiveConf = getHiveConf(url, conf); +// HCatUtil.logHiveConf(LOG, hiveConf); + return new HiveMetaStoreClient(hiveConf); + } + + + private static HiveConf getHiveConf(String url, Configuration conf) throws IOException { HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); if( url != null ) { @@ -372,9 +451,48 @@ public class HCatOutputFormat extends HC } } + + // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ + maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + }else{ + maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions + } + harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED); + return hiveConf; + } - return new HiveMetaStoreClient(hiveConf); + /** + * Any initialization of file paths, set permissions and group on freshly created files + * This is called at RecordWriter instantiation time which can be at write-time for + * a dynamic partitioning usecase + * @param context + * @throws IOException + */ + public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException { + OutputJobInfo info = HCatBaseOutputFormat.getJobInfo(context); +// Path workFile = osd.getWorkFilePath(context,info.getLocation()); + Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir")); + Path tblPath = new Path(info.getTable().getSd().getLocation()); + FileSystem fs = tblPath.getFileSystem(context.getConfiguration()); + FileStatus tblPathStat = fs.getFileStatus(tblPath); + +// LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+ +// workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+ +// context.getConfiguration().get("mapred.output.dir")+"]"); +// +// FileStatus wFileStatus = fs.getFileStatus(workFile); +// LOG.info("Table : "+tblPathStat.getPath()); +// LOG.info("Working File : "+wFileStatus.getPath()); + + fs.setPermission(workFile, tblPathStat.getPermission()); + try{ + fs.setOwner(workFile, null, tblPathStat.getGroup()); + } catch(AccessControlException ace){ + // log the messages before ignoring. Currently, logging is not built in HCat. + } } + } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Fri Jul 22 23:38:07 2011 @@ -22,15 +22,23 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.AccessControlException; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; @@ -40,6 +48,7 @@ import org.apache.hcatalog.data.schema.H */ public abstract class HCatOutputStorageDriver { + /** * Initialize the storage driver with specified properties, default implementation does nothing. * @param context the job context object @@ -103,13 +112,22 @@ public abstract class HCatOutputStorageD * @param jobContext the job context object * @param tableLocation the location of the table * @param partitionValues the partition values + * @param dynHash A unique hash value that represents the dynamic partitioning job used * @return the location String. * @throws IOException Signals that an I/O exception has occurred. */ public String getOutputLocation(JobContext jobContext, - String tableLocation, List partitionCols, Map partitionValues) throws IOException { + String tableLocation, List partitionCols, Map partitionValues, String dynHash) throws IOException { + + String parentPath = tableLocation; + // For dynamic partitioned writes without all keyvalues specified, + // we create a temp dir for the associated write job + if (dynHash != null){ + parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString(); + } - if( partitionValues == null || partitionValues.size() == 0 ) { + // For non-partitioned tables, we send them to the temp dir + if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) { return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString(); } @@ -120,7 +138,7 @@ public abstract class HCatOutputStorageD String partitionLocation = FileUtils.makePartName(partitionCols, values); - Path path = new Path(tableLocation, partitionLocation); + Path path = new Path(parentPath, partitionLocation); return path.toString(); } @@ -130,4 +148,59 @@ public abstract class HCatOutputStorageD public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{ return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part","")); } + + /** + * Implementation that calls the underlying output committer's setupJob, + * used in lieu of underlying committer's setupJob when using dynamic partitioning + * The default implementation should be overriden by underlying implementations + * that do not use FileOutputCommitter. + * The reason this function exists is so as to allow a storage driver implementor to + * override underlying OutputCommitter's setupJob implementation to allow for + * being called multiple times in a job, to make it idempotent. + * This should be written in a manner that is callable multiple times + * from individual tasks without stepping on each others' toes + * + * @param context + * @throws InterruptedException + * @throws IOException + */ + public void setupOutputCommitterJob(TaskAttemptContext context) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).setupJob(context); + } + + /** + * Implementation that calls the underlying output committer's cleanupJob, + * used in lieu of underlying committer's cleanupJob when using dynamic partitioning + * This should be written in a manner that is okay to call after having had + * multiple underlying outputcommitters write to task dirs inside it. + * While the base MR cleanupJob should have sufficed normally, this is provided + * in order to let people implementing setupOutputCommitterJob to cleanup properly + * + * @param context + * @throws IOException + */ + public void cleanupOutputCommitterJob(TaskAttemptContext context) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).cleanupJob(context); + } + + /** + * Implementation that calls the underlying output committer's abortJob, + * used in lieu of underlying committer's abortJob when using dynamic partitioning + * This should be written in a manner that is okay to call after having had + * multiple underlying outputcommitters write to task dirs inside it. + * While the base MR cleanupJob should have sufficed normally, this is provided + * in order to let people implementing setupOutputCommitterJob to abort properly + * + * @param context + * @param state + * @throws IOException + */ + public void abortOutputCommitterJob(TaskAttemptContext context, State state) + throws IOException, InterruptedException{ + getOutputFormat().getOutputCommitter(context).abortJob(context,state); + } + + } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Fri Jul 22 23:38:07 2011 @@ -18,60 +18,174 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.HashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; public class HCatRecordWriter extends RecordWriter, HCatRecord> { private final HCatOutputStorageDriver storageDriver; - /** - * @return the storageDriver - */ - public HCatOutputStorageDriver getStorageDriver() { - return storageDriver; - } + + private boolean dynamicPartitioningUsed = false; + +// static final private Log LOG = LogFactory.getLog(HCatRecordWriter.class); private final RecordWriter, ? super Writable> baseWriter; + private final Map, ? super Writable>> baseDynamicWriters; + private final Map baseDynamicStorageDrivers; + private final List partColsToDel; + private final List dynamicPartCols; + private int maxDynamicPartitions; + + private OutputJobInfo jobInfo; + private TaskAttemptContext context; public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context); + jobInfo = HCatOutputFormat.getJobInfo(context); + this.context = context; // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); + dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed(); + dynamicPartCols = jobInfo.getPosOfDynPartCols(); + maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); - if(partColsToDel == null){ + if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){ throw new HCatException("It seems that setSchema() is not called on " + "HCatOutputFormat. Please make sure that method is called."); } + + + if (!dynamicPartitioningUsed){ + this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo); + this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context); + this.baseDynamicStorageDrivers = null; + this.baseDynamicWriters = null; + }else{ + this.baseDynamicStorageDrivers = new HashMap(); + this.baseDynamicWriters = new HashMap, ? super Writable>>(); + this.storageDriver = null; + this.baseWriter = null; + } - this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo); - this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context); + } + + /** + * @return the storageDriver + */ + public HCatOutputStorageDriver getStorageDriver() { + return storageDriver; } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + if (dynamicPartitioningUsed){ + for (RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()){ + bwriter.close(context); + } + for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){ + OutputCommitter baseOutputCommitter = osd.getOutputFormat().getOutputCommitter(context); + if (baseOutputCommitter.needsTaskCommit(context)){ + baseOutputCommitter.commitTask(context); + } + } + } else { baseWriter.close(context); + } } @Override public void write(WritableComparable key, HCatRecord value) throws IOException, InterruptedException { + RecordWriter, ? super Writable> localWriter; + HCatOutputStorageDriver localDriver; + +// HCatUtil.logList(LOG, "HCatRecord to write", value.getAll()); + + if (dynamicPartitioningUsed){ + // calculate which writer to use from the remaining values - this needs to be done before we delete cols + + List dynamicPartValues = new ArrayList(); + for (Integer colToAppend : dynamicPartCols){ + dynamicPartValues.add(value.get(colToAppend).toString()); + } + + int dynHashCode = dynamicPartValues.hashCode(); + if (!baseDynamicWriters.containsKey(dynHashCode)){ +// LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size() +// +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString()); + if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){ + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } +// HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues); +// HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols); + + HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues); + RecordWriter, ? super Writable> baseRecordWriter + = localOsd.getOutputFormat().getRecordWriter(context); + localOsd.setupOutputCommitterJob(context); + OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context); + baseOutputCommitter.setupTask(context); + prepareForStorageDriverOutput(localOsd,context); + baseDynamicWriters.put(dynHashCode, baseRecordWriter); + baseDynamicStorageDrivers.put(dynHashCode,localOsd); + } + + localWriter = baseDynamicWriters.get(dynHashCode); + localDriver = baseDynamicStorageDrivers.get(dynHashCode); + }else{ + localWriter = baseWriter; + localDriver = storageDriver; + } for(Integer colToDel : partColsToDel){ value.remove(colToDel); } - //The key given by user is ignored - WritableComparable generatedKey = storageDriver.generateKey(value); - Writable convertedValue = storageDriver.convertValue(value); - baseWriter.write(generatedKey, convertedValue); + + //The key given by user is ignored + WritableComparable generatedKey = localDriver.generateKey(value); + Writable convertedValue = localDriver.convertValue(value); + localWriter.write(generatedKey, convertedValue); + } + + protected HCatOutputStorageDriver createDynamicStorageDriver(List dynamicPartVals) throws IOException { + HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals); + return localOsd; + } + + public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException { + // Set permissions and group on freshly created files. + if (!dynamicPartitioningUsed){ + HCatOutputStorageDriver localOsd = this.getStorageDriver(); + prepareForStorageDriverOutput(localOsd,context); + } + } + + private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd, + TaskAttemptContext context) throws IOException { + HCatOutputFormat.prepareOutputLocation(localOsd,context); } }