incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1177788 [2/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/test/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/rcfile/
Date Fri, 30 Sep 2011 19:23:22 GMT
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
Fri Sep 30 19:23:21 2011
@@ -21,7 +21,6 @@ package org.apache.hcatalog.mapreduce;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -30,10 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -42,18 +38,13 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.hive.thrift.DelegationTokenSelector;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -65,7 +56,6 @@ import org.apache.hcatalog.common.HCatEx
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.thrift.TException;
 
 /** The OutputFormat to use to write data to HCat. The key value is ignored and
  * and should be given as null. The value is the HCatRecord to write.*/
@@ -73,20 +63,7 @@ public class HCatOutputFormat extends HC
 
 //    static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
 
-    /** The directory under which data is initially written for a non partitioned table */
-    protected static final String TEMP_DIR_NAME = "_TEMP";
-    
-    /** */
-    protected static final String DYNTEMP_DIR_NAME = "_DYN";
-    
     private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>>
tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
-
-    private static final PathFilter hiddenFileFilter = new PathFilter(){
-      public boolean accept(Path p){
-        String name = p.getName();
-        return !name.startsWith("_") && !name.startsWith(".");
-      }
-    };
     
     private static int maxDynamicPartitions;
     private static boolean harRequested;
@@ -161,9 +138,6 @@ public class HCatOutputFormat extends HC
           outputJobInfo.setPartitionValues(valueMap);
         }
 
-        //Handle duplicate publish
-        handleDuplicatePublish(job, outputJobInfo, client, table);
-
         StorageDescriptor tblSD = table.getSd();
         HCatSchema tableSchema = HCatUtil.extractSchemaFromStorageDescriptor(tblSD);
         StorerInfo storerInfo = InitializeInput.extractStorerInfo(tblSD,table.getParameters());
@@ -211,7 +185,7 @@ public class HCatOutputFormat extends HC
           // new Text() do new Text("oozie") below - if this change is made also
           // remember to do:
           //  job.getConfiguration().set(HCAT_KEY_TOKEN_SIGNATURE, "oozie");
-          // Also change code in HCatOutputCommitter.cleanupJob() to cancel the
+          // Also change code in OutputCommitter.cleanupJob() to cancel the
           // token only if token.service is not "oozie" - remove the condition of
           // HCAT_KEY_TOKEN_SIGNATURE != null in that code.
           Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
@@ -238,8 +212,8 @@ public class HCatOutputFormat extends HC
             String tokenSignature = getTokenSignature(outputJobInfo);
             if(tokenMap.get(tokenSignature) == null) {
               // get delegation tokens from hcat server and store them into the "job"
-              // These will be used in the HCatOutputCommitter to publish partitions to
-              // hcat
+              // These will be used in to publish partitions to
+              // hcat normally in OutputCommitter.commitJob()
               // when the JobTracker in Hadoop MapReduce starts supporting renewal of 
               // arbitrary tokens, the renewer should be the principal of the JobTracker
               tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
@@ -312,61 +286,6 @@ public class HCatOutputFormat extends HC
     }
 
     /**
-     * Handles duplicate publish of partition. Fails if partition already exists.
-     * For non partitioned tables, fails if files are present in table directory.
-     * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter
time
-     * @param job the job
-     * @param outputInfo the output info
-     * @param client the metastore client
-     * @param table the table being written to
-     * @throws IOException
-     * @throws MetaException
-     * @throws TException
-     */
-    private static void handleDuplicatePublish(Job job, OutputJobInfo outputInfo,
-        HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException
{
-
-      /*
-       * For fully specified ptn, follow strict checks for existence of partitions in metadata
-       * For unpartitioned tables, follow filechecks
-       * For partially specified tables:
-       *    This would then need filechecks at the start of a ptn write,
-       *    Doing metadata checks can get potentially very expensive (fat conf) if 
-       *    there are a large number of partitions that match the partial specifications
-       */
-
-      if( table.getPartitionKeys().size() > 0 ) {
-        if (!outputInfo.isDynamicPartitioningUsed()){
-          List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
-              table, outputInfo.getPartitionValues());
-          // fully-specified partition
-          List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
-              outputInfo.getTableName(), partitionValues, (short) 1);
-
-          if( currentParts.size() > 0 ) {
-            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
-          }
-        }
-      } else {
-        List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
-            table, outputInfo.getPartitionValues());
-        // non-partitioned table
-        
-        Path tablePath = new Path(table.getSd().getLocation());
-        FileSystem fs = tablePath.getFileSystem(job.getConfiguration());
-
-        if ( fs.exists(tablePath) ) {
-          FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter);
-
-          if( status.length > 0 ) {
-            throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE,
-                      table.getDbName() + "." + table.getTableName());
-          }
-        }
-      }
-    }
-
-    /**
      * Set the schema for the data being written out to the partition. The
      * table schema is used by default for the partition if this is not called.
      * @param job the job object
@@ -391,10 +310,7 @@ public class HCatOutputFormat extends HC
     public RecordWriter<WritableComparable<?>, HCatRecord>
       getRecordWriter(TaskAttemptContext context
                       ) throws IOException, InterruptedException {
-
-      HCatRecordWriter rw = new HCatRecordWriter(context);
-      rw.prepareForStorageDriverOutput(context);
-      return rw;
+      return getOutputFormat(context).getRecordWriter(context);
     }
 
 
@@ -409,8 +325,7 @@ public class HCatOutputFormat extends HC
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context
                                        ) throws IOException, InterruptedException {
-        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat
= getOutputFormat(context);
-        return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context));
+        return getOutputFormat(context).getOutputCommitter(context);
     }
 
     static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException,
MetaException {
@@ -420,7 +335,7 @@ public class HCatOutputFormat extends HC
     }
 
 
-    private static HiveConf getHiveConf(String url, Configuration conf) throws IOException
{
+    static HiveConf getHiveConf(String url, Configuration conf) throws IOException {
       HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
 
       if( url != null ) {
@@ -477,37 +392,4 @@ public class HCatOutputFormat extends HC
       return hiveConf;
     }
 
-    /**
-     * Any initialization of file paths, set permissions and group on freshly created files
-     * This is called at RecordWriter instantiation time which can be at write-time for 

-     * a dynamic partitioning usecase
-     * @param context
-     * @throws IOException
-     */
-    public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext
context) throws IOException {
-      OutputJobInfo info =  HCatBaseOutputFormat.getJobInfo(context);
-//      Path workFile = osd.getWorkFilePath(context,info.getLocation());
-      Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
-      Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation());
-      FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
-      FileStatus tblPathStat = fs.getFileStatus(tblPath);
-      
-//      LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+
-//          workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+
-//          context.getConfiguration().get("mapred.output.dir")+"]"); 
-//
-//      FileStatus wFileStatus = fs.getFileStatus(workFile);
-//      LOG.info("Table : "+tblPathStat.getPath());
-//      LOG.info("Working File : "+wFileStatus.getPath());
-      
-      fs.setPermission(workFile, tblPathStat.getPermission());
-      try{
-        fs.setOwner(workFile, null, tblPathStat.getGroup());
-      } catch(AccessControlException ace){
-        // log the messages before ignoring. Currently, logging is not built in HCat.
-      }
-    }
-
-
-
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
Fri Sep 30 19:23:21 2011
@@ -17,28 +17,17 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
@@ -118,35 +107,13 @@ public abstract class HCatOutputStorageD
      */
     public String getOutputLocation(JobContext jobContext,
             String tableLocation, List<String> partitionCols, Map<String, String>
partitionValues, String dynHash) throws IOException {
-      
-      String parentPath = tableLocation;
-      // For dynamic partitioned writes without all keyvalues specified, 
-      // we create a temp dir for the associated write job
-      if (dynHash != null){
-        parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString();
-      }
-
-      // For non-partitioned tables, we send them to the temp dir
-      if((dynHash == null) && ( partitionValues == null || partitionValues.size()
== 0 )) {
-        return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString();
-      }
-
-      List<String> values = new ArrayList<String>();
-      for(String partitionCol : partitionCols) {
-        values.add(partitionValues.get(partitionCol));
-      }
-
-      String partitionLocation = FileUtils.makePartName(partitionCols, values);
-
-      Path path = new Path(parentPath, partitionLocation);
-      return path.toString();
+      return null;
     }
 
-    /** Default implementation assumes FileOutputFormat. Storage drivers wrapping
-     * other OutputFormats should override this method.
+    /** Storage drivers wrapping other OutputFormats should override this method.
      */
     public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
-      return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(),
FileOutputFormat.getUniqueFile(context, "part",""));
+      return null;
     }
 
     /**
@@ -202,5 +169,13 @@ public abstract class HCatOutputStorageD
       getOutputFormat().getOutputCommitter(context).abortJob(context,state);
     }
 
-    
+    /**
+     * return an instance of OutputFormatContainer containing the passed outputFormat. DefaultOutputFormatContainer
is returned by default.
+     * @param outputFormat format the returned container will contain
+     * @return
+     */
+    OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) {
+        return new DefaultOutputFormatContainer(outputFormat);
+    }
+
 }

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
Fri Sep 30 19:23:21 2011
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+/**
+ *  This class will contain an implementation of an OutputCommitter.
+ *  See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class OutputCommitterContainer extends OutputCommitter {
+    private final OutputCommitter committer;
+
+    /**
+     * @param context current JobContext
+     * @param committer OutputCommitter that this instance will contain
+     */
+    public OutputCommitterContainer(JobContext context, OutputCommitter committer) {
+        this.committer = committer;
+    }
+
+    /**
+     * @return underlying OutputCommitter
+     */
+    public OutputCommitter getBaseOutputCommitter() {
+        return committer;
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
Fri Sep 30 19:23:21 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ *  This container class is used to wrap OutputFormat implementations and augment them with
+ *  behavior necessary to work with HCatalog (ie metastore updates, hcatalog delegation tokens,
etc).
+ *  Containers are also used to provide storage specific implementations of some HCatalog
features (ie dynamic partitioning).
+ *  Hence users wishing to create storage specific implementations of HCatalog features should
implement this class and override
+ *  HCatOutputStorageDriver.getOutputFormatContainer() to return the implementation.
+ *  By default DefaultOutputFormatContainer is used, which only implements the bare minimum
features HCatalog features
+ *  such as partitioning isn't supported.
+ */
+abstract class OutputFormatContainer extends OutputFormat<WritableComparable<?>,
HCatRecord> {
+    private OutputFormat<? super WritableComparable<?>, ? super Writable> of;
+
+    /**
+     * @param of OutputFormat this instance will contain
+     */
+    public OutputFormatContainer(OutputFormat<? super WritableComparable<?>,? super
Writable> of) {
+        this.of = of;
+    }
+
+    /**
+     * @return underlying OutputFormat
+     */
+    public OutputFormat getBaseOutputFormat() {
+        return of;
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
Fri Sep 30 19:23:21 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ *  This class will contain an implementation of an RecordWriter.
+ *  See {@link OutputFormatContainer} for more information about containers.
+ */
+abstract class RecordWriterContainer extends  RecordWriter<WritableComparable<?>,
HCatRecord> {
+
+    private final RecordWriter<? super WritableComparable<?>, ? super Writable>
baseRecordWriter;
+
+    /**
+     * @param context current JobContext
+     * @param baseRecordWriter RecordWriter that this instance will contain
+     */
+    public RecordWriterContainer(TaskAttemptContext context,
+                                 RecordWriter<? super WritableComparable<?>, ? super
Writable> baseRecordWriter) {
+        this.baseRecordWriter = baseRecordWriter;
+    }
+
+    /**
+     * @return underlying RecordWriter
+     */
+    public RecordWriter getBaseRecordWriter() {
+        return baseRecordWriter;
+    }
+
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Fri Sep
30 19:23:21 2011
@@ -142,7 +142,7 @@ public class HCatEximStorer extends HCat
   @Override
   public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException
{
     if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") )
{
-      //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
+      //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
       new HCatEximOutputCommitter(job,null).cleanupJob(job);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Sep 30 19:23:21
2011
@@ -23,13 +23,16 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
@@ -141,10 +144,16 @@ public class HCatStorer extends HCatBase
   @Override
   public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException
{
     if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") )
{
-      //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob.
+      try {
+      //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
       //Calling it from here so that the partition publish happens.
       //This call needs to be removed after MAPREDUCE-1447 is fixed.
-      new HCatOutputCommitter(job,null).cleanupJob(job);
+        getOutputFormat().getOutputCommitter(new TaskAttemptContext(job.getConfiguration(),
new TaskAttemptID())).cleanupJob(job);
+      } catch (IOException e) {
+        throw new IOException("Failed to cleanup job",e);
+      } catch (InterruptedException e) {
+        throw new IOException("Failed to cleanup job",e);
+      }
     }
   }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Fri
Sep 30 19:23:21 2011
@@ -45,17 +45,16 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.FileOutputStorageDriver;
 
 /**
  * The storage driver for writing RCFile data through HCatOutputFormat.
  */
- public class RCFileOutputDriver extends HCatOutputStorageDriver {
+ public class RCFileOutputDriver extends FileOutputStorageDriver {
 
    /** The serde for serializing the HCatRecord to bytes writable */
    private SerDe serde;

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Fri Sep 30 19:23:21 2011
@@ -58,10 +58,6 @@ import org.apache.hcatalog.data.DefaultH
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
 
@@ -273,9 +269,8 @@ public abstract class HCatMapReduceTest 
 
     HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
 
-    //new HCatOutputCommitter(null).setupJob(job);
     job.waitForCompletion(true);
-    new HCatOutputCommitter(job,null).cleanupJob(job);
+    new FileOutputCommitterContainer(job,null).cleanupJob(job);
     if (assertWrite){
       // we assert only if we expected to assert with this call.
       Assert.assertEquals(writeCount, MapCreate.writeCount);

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Fri Sep 30 19:23:21 2011
@@ -134,7 +134,7 @@ public class TestHCatDynamicPartitioned 
 
   public void testHCatDynamicPartitionMaxPartitions() throws Exception {
     HiveConf hc = new HiveConf(this.getClass());
-    
+
     int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
     System.out.println("Max partitions allowed = " + maxParts);
 

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
Fri Sep 30 19:23:21 2011
@@ -41,12 +41,6 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.InitializeInput;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.mapreduce.StorerInfo;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
 
 public class TestHCatOutputFormat extends TestCase {
@@ -159,7 +153,7 @@ public class TestHCatOutputFormat extend
   }
 
   public void publishTest(Job job) throws Exception {
-    OutputCommitter committer = new HCatOutputCommitter(job,null);
+    OutputCommitter committer = new FileOutputCommitterContainer(job,null);
     committer.cleanupJob(job);
 
     Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
Fri Sep 30 19:23:21 2011
@@ -30,12 +30,15 @@ import org.apache.hadoop.hive.serde2.col
 import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
 import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
 
@@ -44,6 +47,8 @@ public class TestRCFileOutputStorageDriv
   public void testConversion() throws IOException {
     Configuration conf = new Configuration();
     JobContext jc = new JobContext(conf, new JobID());
+    String jobString = HCatUtil.serialize(OutputJobInfo.create(null,null,null,null,null));
+    jc.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,jobString);
 
     HCatSchema schema = buildHiveSchema();
     HCatInputStorageDriver isd = new RCFileInputDriver();



Mime
View raw message