incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1177788 [1/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/test/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/rcfile/
Date Fri, 30 Sep 2011 19:23:22 GMT
Author: gates
Date: Fri Sep 30 19:23:21 2011
New Revision: 1177788

URL: http://svn.apache.org/viewvc?rev=1177788&view=rev
Log:
HCAT-60 Refactor HCatalog to support non-filebased outputformats

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
Removed:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Sep 30 19:23:21 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-60. Refactor HCatalog to support non-filebased outputformats (toffer via gates)
+
   HCAT-63. RPM package integration with Hadoop (khorgath via hashutosh)
 
   IMPROVEMENTS

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultOutputCommitterContainer extends OutputCommitterContainer {
+
+    /**
+     * @param context current JobContext
+     * @param baseCommitter OutputCommitter to contain
+     * @throws IOException
+     */
+    public DefaultOutputCommitterContainer(JobContext context, OutputCommitter baseCommitter) throws IOException {
+        super(context,baseCommitter);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().abortTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().commitTask(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        return getBaseOutputCommitter().needsTaskCommit(context);
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+        getBaseOutputCommitter().setupJob(context);
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+        getBaseOutputCommitter().setupTask(context);
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, State state) throws IOException {
+        getBaseOutputCommitter().abortJob(jobContext, state);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+        getBaseOutputCommitter().commitJob(jobContext);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        getBaseOutputCommitter().cleanupJob(context);
+
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+        //Cancel HCat and JobTracker tokens
+        try {
+            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), context.getConfiguration());
+            String tokenStrForm = client.getTokenStrForm();
+            if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+              client.cancelDelegationToken(tokenStrForm);
+            }
+        } catch (Exception e) {
+            throw new IOException("Failed to cancel delegation token",e);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+
+/**
+ * Bare bones implementation of OutputFormatContainer. Does only the required
+ * tasks to work properly with HCatalog. HCatalog features which require a
+ * storage specific implementation are unsupported (ie partitioning).
+ */
+class DefaultOutputFormatContainer extends OutputFormatContainer {
+
+    public DefaultOutputFormatContainer(OutputFormat<WritableComparable<?>, Writable> of) {
+        super(of);
+    }
+
+
+    /**
+     * Get the record writer for the job. Uses the Table's default OutputStorageDriver
+     * to get the record writer.
+     * @param context the information about the current task.
+     * @return a RecordWriter to write the output for the job.
+     * @throws IOException
+     */
+    @Override
+    public RecordWriter<WritableComparable<?>, HCatRecord>
+    getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new DefaultRecordWriterContainer(context, getBaseOutputFormat().getRecordWriter(context));
+    }
+
+
+    /**
+     * Get the output committer for this output format. This is responsible
+     * for ensuring the output is committed correctly.
+     * @param context the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        OutputFormat outputFormat = getBaseOutputFormat();
+        return new DefaultOutputCommitterContainer(context, getBaseOutputFormat().getOutputCommitter(context));
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     * @param context information about the job
+     * @throws IOException when output should not be attempted
+     */
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
+        outputFormat.checkOutputSpecs(context);
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+
+/**
+ * Part of the DefaultOutput*Container classes
+ * See {@link DefaultOutputFormatContainer} for more information
+ */
+class DefaultRecordWriterContainer extends RecordWriterContainer {
+
+    private final HCatOutputStorageDriver storageDriver;
+    private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter;
+    private final OutputJobInfo jobInfo;
+
+    /**
+     * @param context current JobContext
+     * @param baseRecordWriter RecordWriter to contain
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public DefaultRecordWriterContainer(TaskAttemptContext context,
+                                        RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
+        super(context,baseRecordWriter);
+        jobInfo = HCatOutputFormat.getJobInfo(context);
+        this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
+        this.baseRecordWriter = baseRecordWriter;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        baseRecordWriter.close(context);
+    }
+
+    @Override
+    public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+            InterruptedException {
+        WritableComparable<?> generatedKey = storageDriver.generateKey(value);
+        Writable convertedValue = storageDriver.convertValue(value);
+        baseRecordWriter.write(generatedKey, convertedValue);
+    }
+
+    @Override
+    public RecordWriter<? super WritableComparable<?>, ? super Writable> getBaseRecordWriter() {
+        return baseRecordWriter;
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Part of the FileOutput*Container classes
+ * See {@link FileOutputFormatContainer} for more information
+ */
+class FileOutputCommitterContainer extends OutputCommitterContainer {
+
+    private final boolean dynamicPartitioningUsed;
+    private boolean partitionsDiscovered;
+
+    private Map<String, Map<String, String>> partitionsDiscoveredByPath;
+    private Map<String, HCatOutputStorageDriver> storageDriversDiscoveredByPath;
+
+    HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor();
+
+    private String ptnRootLocation = null;
+
+    private OutputJobInfo jobInfo = null;
+
+    /**
+     * @param context current JobContext
+     * @param baseCommitter OutputCommitter to contain
+     * @throws IOException
+     */
+    public FileOutputCommitterContainer(JobContext context,
+                                        OutputCommitter baseCommitter) throws IOException {
+        super(context, baseCommitter);
+        jobInfo = HCatOutputFormat.getJobInfo(context);
+        dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+
+        this.partitionsDiscovered = !dynamicPartitioningUsed;
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+        if (!dynamicPartitioningUsed){
+            getBaseOutputCommitter().abortTask(context);
+        }
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+        if (!dynamicPartitioningUsed){
+            getBaseOutputCommitter().commitTask(context);
+        }else{
+            // called explicitly through FileRecordWriterContainer.close() if dynamic
+        }
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        if (!dynamicPartitioningUsed){
+            return getBaseOutputCommitter().needsTaskCommit(context);
+        }else{
+            // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default
+            return false;
+        }
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+            getBaseOutputCommitter().setupJob(context);
+        }
+        // in dynamic usecase, called through FileRecordWriterContainer
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+        if (!dynamicPartitioningUsed){
+            getBaseOutputCommitter().setupTask(context);
+        }else{
+            // called explicitly through FileRecordWriterContainer.write() if dynamic
+        }
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, State state) throws IOException {
+        if (dynamicPartitioningUsed){
+            discoverPartitions(jobContext);
+        }
+
+        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+            getBaseOutputCommitter().abortJob(jobContext, state);
+        }
+        else if (dynamicPartitioningUsed){
+            for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+                try {
+                    baseOsd.abortOutputCommitterJob(
+                            new TaskAttemptContext(
+                                    jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+                            ),state);
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+        }
+
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+
+        try {
+            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
+                    jobInfo.getServerUri(), jobContext.getConfiguration());
+            // cancel the deleg. tokens that were acquired for this job now that
+            // we are done - we should cancel if the tokens were acquired by
+            // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+            // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+            String tokenStrForm = client.getTokenStrForm();
+            if(tokenStrForm != null && jobContext.getConfiguration().get
+                    (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                client.cancelDelegationToken(tokenStrForm);
+            }
+
+            if (harProcessor.isEnabled()) {
+                String jcTokenStrForm = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+                String jcTokenSignature = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+                if(jcTokenStrForm != null && jcTokenSignature != null) {
+                    HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+                }
+            }
+
+        } catch(Exception e) {
+            if( e instanceof HCatException ) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+            }
+        }
+
+        Path src;
+        if (dynamicPartitioningUsed){
+            src = new Path(getPartitionRootLocation(
+                    jobInfo.getLocation().toString(),jobInfo.getTableInfo().getTable().getPartitionKeysSize()
+            ));
+        }else{
+            src = new Path(jobInfo.getLocation());
+        }
+        FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+//      LOG.warn("abortJob about to delete ["+src.toString() +"]");
+        fs.delete(src, true);
+    }
+
+    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+    static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+            "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+    private static boolean getOutputDirMarking(Configuration conf) {
+        return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+                false);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+        if (dynamicPartitioningUsed){
+            discoverPartitions(jobContext);
+        }
+        if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+            getBaseOutputCommitter().commitJob(jobContext);
+        }
+        // create _SUCCESS FILE if so requested.
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+        if(getOutputDirMarking(jobContext.getConfiguration())) {
+            Path outputPath = new Path(jobInfo.getLocation());
+            if (outputPath != null) {
+                FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
+                // create a file in the folder to mark it
+                if (fileSys.exists(outputPath)) {
+                    Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+                    if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
+                        fileSys.create(filePath).close();
+                    }
+                }
+            }
+        }
+        cleanupJob(jobContext);
+    }
+
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        if (dynamicPartitioningUsed){
+            discoverPartitions(context);
+        }
+
+
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+        Configuration conf = context.getConfiguration();
+        Table table = jobInfo.getTableInfo().getTable();
+        Path tblPath = new Path(table.getSd().getLocation());
+        FileSystem fs = tblPath.getFileSystem(conf);
+
+        if( table.getPartitionKeys().size() == 0 ) {
+            //non partitioned table
+            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().cleanupJob(context);
+            }
+            else if (dynamicPartitioningUsed){
+                for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+                    try {
+                        baseOsd.cleanupOutputCommitterJob(
+                                new TaskAttemptContext(
+                                        context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+                                ));
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+
+            //Move data from temp directory the actual table directory
+            //No metastore operation required.
+            Path src = new Path(jobInfo.getLocation());
+            moveTaskOutputs(fs, src, src, tblPath, false);
+            fs.delete(src, true);
+            return;
+        }
+
+        HiveMetaStoreClient client = null;
+        HCatTableInfo tableInfo = jobInfo.getTableInfo();
+
+        List<Partition> partitionsAdded = new ArrayList<Partition>();
+
+        try {
+            client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf);
+
+            StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
+
+            updateTableSchema(client, table, jobInfo.getOutputSchema());
+
+            FileStatus tblStat = fs.getFileStatus(tblPath);
+            String grpName = tblStat.getGroup();
+            FsPermission perms = tblStat.getPermission();
+
+            List<Partition> partitionsToAdd = new ArrayList<Partition>();
+            if (!dynamicPartitioningUsed){
+                partitionsToAdd.add(
+                        constructPartition(
+                                context,
+                                tblPath.toString(), jobInfo.getPartitionValues()
+                                ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                ,table, fs
+                                ,grpName,perms));
+            }else{
+                for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+                    partitionsToAdd.add(
+                            constructPartition(
+                                    context,
+                                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+                                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                    ,table, fs
+                                    ,grpName,perms));
+                }
+            }
+
+            //Publish the new partition(s)
+            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+                Path src = new Path(ptnRootLocation);
+
+                // check here for each dir we're copying out, to see if it already exists, error out if so
+                moveTaskOutputs(fs, src, src, tblPath,true);
+
+                moveTaskOutputs(fs, src, src, tblPath,false);
+                fs.delete(src, true);
+
+
+//          for (Partition partition : partitionsToAdd){
+//            partitionsAdded.add(client.add_partition(partition));
+//            // currently following add_partition instead of add_partitions because latter isn't
+//            // all-or-nothing and we want to be able to roll back partitions we added if need be.
+//          }
+
+                try {
+                    client.add_partitions(partitionsToAdd);
+                    partitionsAdded = partitionsToAdd;
+                } catch (Exception e){
+                    // There was an error adding partitions : rollback fs copy and rethrow
+                    for (Partition p : partitionsToAdd){
+                        Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+                        if (fs.exists(ptnPath)){
+                            fs.delete(ptnPath,true);
+                        }
+                    }
+                    throw e;
+                }
+
+            }else{
+                // no harProcessor, regular operation
+
+                // No duplicate partition publish case to worry about because we'll
+                // get a AlreadyExistsException here if so, and appropriately rollback
+
+                client.add_partitions(partitionsToAdd);
+                partitionsAdded = partitionsToAdd;
+
+                if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+                    Path src = new Path(ptnRootLocation);
+                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    fs.delete(src, true);
+                }
+
+            }
+
+            if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().cleanupJob(context);
+            }
+
+            //Cancel HCat and JobTracker tokens
+            // cancel the deleg. tokens that were acquired for this job now that
+            // we are done - we should cancel if the tokens were acquired by
+            // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+            // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+            String tokenStrForm = client.getTokenStrForm();
+            if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+              client.cancelDelegationToken(tokenStrForm);
+            }
+            if(harProcessor.isEnabled()) {
+                String jcTokenStrForm =
+                  context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+                String jcTokenSignature =
+                  context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+                if(jcTokenStrForm != null && jcTokenSignature != null) {
+                  HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+                }
+            }
+        } catch (Exception e) {
+
+            if( partitionsAdded.size() > 0 ) {
+                try {
+                    //baseCommitter.cleanupJob failed, try to clean up the metastore
+                    for (Partition p : partitionsAdded){
+                        client.dropPartition(tableInfo.getDatabaseName(),
+                                tableInfo.getTableName(), p.getValues());
+                    }
+                } catch(Exception te) {
+                    //Keep cause as the original exception
+                    throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+                }
+            }
+
+            if( e instanceof HCatException ) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+            }
+        } finally {
+            if( client != null ) {
+                client.close();
+            }
+        }
+    }
+
+    private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
+        if (ptnRootLocation  == null){
+            // we only need to calculate it once, it'll be the same for other partitions in this job.
+            Path ptnRoot = new Path(ptnLocn);
+            for (int i = 0; i < numPtnKeys; i++){
+//          LOG.info("Getting parent of "+ptnRoot.getName());
+                ptnRoot = ptnRoot.getParent();
+            }
+            ptnRootLocation = ptnRoot.toString();
+        }
+//      LOG.info("Returning final parent : "+ptnRootLocation);
+        return ptnRootLocation;
+    }
+
+    /**
+     * Generate partition metadata object to be used to add to metadata.
+     * @param partLocnRoot The table-equivalent location root of the partition
+     *                       (temporary dir if dynamic partition, table dir if static)
+     * @param partKVs The keyvalue pairs that form the partition
+     * @param outputSchema The output schema for the partition
+     * @param params The parameters to store inside the partition
+     * @param table The Table metadata object under which this Partition will reside
+     * @param fs FileSystem object to operate on the underlying filesystem
+     * @param grpName Group name that owns the table dir
+     * @param perms FsPermission that's the default permission of the table dir.
+     * @return Constructed Partition metadata object
+     * @throws java.io.IOException
+     */
+
+    private Partition constructPartition(
+            JobContext context,
+            String partLocnRoot, Map<String,String> partKVs,
+            HCatSchema outputSchema, Map<String, String> params,
+            Table table, FileSystem fs,
+            String grpName, FsPermission perms) throws IOException {
+
+        StorageDescriptor tblSD = table.getSd();
+
+        Partition partition = new Partition();
+        partition.setDbName(table.getDbName());
+        partition.setTableName(table.getTableName());
+        partition.setSd(new StorageDescriptor(tblSD));
+
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        for(HCatFieldSchema fieldSchema : outputSchema.getFields()) {
+            fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+        }
+
+        partition.getSd().setCols(fields);
+
+        partition.setValues(FileOutputFormatContainer.getPartitionValueList(table, partKVs));
+
+        partition.setParameters(params);
+
+        // Sets permissions and group name on partition dirs.
+
+        Path partPath = new Path(partLocnRoot);
+        for(FieldSchema partKey : table.getPartitionKeys()){
+            partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+//        LOG.info("Setting perms for "+partPath.toString());
+            fs.setPermission(partPath, perms);
+            try{
+                fs.setOwner(partPath, null, grpName);
+            } catch(AccessControlException ace){
+                // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+//          LOG.warn(ace);
+            }
+        }
+        if (dynamicPartitioningUsed){
+            String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
+            if (harProcessor.isEnabled()){
+                harProcessor.exec(context, partition, partPath);
+                partition.getSd().setLocation(
+                        harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination)));
+            }else{
+                partition.getSd().setLocation(dynamicPartitionDestination);
+            }
+        }else{
+            partition.getSd().setLocation(partPath.toString());
+        }
+
+        return partition;
+    }
+
+
+
+    private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
+        // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA  ->
+        // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+        Path partPath = new Path(table.getSd().getLocation());
+        for(FieldSchema partKey : table.getPartitionKeys()){
+            partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+        }
+        return partPath.toString();
+    }
+
+    private Map<String, String> getStorerParameterMap(StorerInfo storer) {
+        Map<String, String> params = new HashMap<String, String>();
+        params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
+        params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+
+        //Copy table level hcat.* keys to the partition
+        for(Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+            params.put(entry.getKey().toString(), entry.getValue().toString());
+        }
+        return params;
+    }
+
+    private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
+
+        StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
+        sb.append("=");
+        sb.append(FileUtils.escapePathName(partKVs.get(partKey)));
+        return new Path(partialPath, sb.toString());
+    }
+
+    /**
+     * Update table schema, adding new columns as added for the partition.
+     * @param client the client
+     * @param table the table
+     * @param partitionSchema the schema of the partition
+     * @throws java.io.IOException Signals that an I/O exception has occurred.
+     * @throws org.apache.hadoop.hive.metastore.api.InvalidOperationException the invalid operation exception
+     * @throws org.apache.hadoop.hive.metastore.api.MetaException the meta exception
+     * @throws org.apache.thrift.TException the t exception
+     */
+    private void updateTableSchema(HiveMetaStoreClient client, Table table,
+                                   HCatSchema partitionSchema) throws IOException, InvalidOperationException, MetaException, TException {
+
+        List<FieldSchema> newColumns = HCatUtil.validatePartitionSchema(table, partitionSchema);
+
+        if( newColumns.size() != 0 ) {
+            List<FieldSchema> tableColumns = new ArrayList<FieldSchema>(table.getSd().getCols());
+            tableColumns.addAll(newColumns);
+
+            //Update table schema to add the newly added columns
+            table.getSd().setCols(tableColumns);
+            client.alter_table(table.getDbName(), table.getTableName(), table);
+        }
+    }
+
+    /**
+     * Move all of the files from the temp directory to the final location
+     * @param fs the output file system
+     * @param file the file to move
+     * @param src the source directory
+     * @param dest the target directory
+     * @param dryRun - a flag that simply tests if this move would succeed or not based
+     *                 on whether other files exist where we're trying to copy
+     * @throws java.io.IOException
+     */
+    private void moveTaskOutputs(FileSystem fs,
+                                 Path file,
+                                 Path src,
+                                 Path dest, boolean dryRun) throws IOException {
+        if (fs.isFile(file)) {
+            Path finalOutputPath = getFinalPath(file, src, dest);
+
+            if (dryRun){
+//        LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
+                if (fs.exists(finalOutputPath)){
+                    throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
+                }
+            }else{
+//        LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
+                if (!fs.rename(file, finalOutputPath)) {
+                    if (!fs.delete(finalOutputPath, true)) {
+                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+                    }
+                    if (!fs.rename(file, finalOutputPath)) {
+                        throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+                    }
+                }
+            }
+        } else if(fs.getFileStatus(file).isDir()) {
+            FileStatus[] paths = fs.listStatus(file);
+            Path finalOutputPath = getFinalPath(file, src, dest);
+            if (!dryRun){
+                fs.mkdirs(finalOutputPath);
+            }
+            if (paths != null) {
+                for (FileStatus path : paths) {
+                    moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
+                }
+            }
+        }
+    }
+
+    /**
+     * Find the final name of a given output file, given the output directory
+     * and the work directory.
+     * @param file the file to move
+     * @param src the source directory
+     * @param dest the target directory
+     * @return the final path for the specific output file
+     * @throws java.io.IOException
+     */
+    private Path getFinalPath(Path file, Path src,
+                              Path dest) throws IOException {
+        URI taskOutputUri = file.toUri();
+        URI relativePath = src.toUri().relativize(taskOutputUri);
+        if (taskOutputUri == relativePath) {
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Can not get the relative path: base = " +
+                    src + " child = " + file);
+        }
+        if (relativePath.getPath().length() > 0) {
+            return new Path(dest, relativePath.getPath());
+        } else {
+            return dest;
+        }
+    }
+
+    /**
+     * Run to discover dynamic partitions available
+     */
+    private void discoverPartitions(JobContext context) throws IOException {
+        if (!partitionsDiscovered){
+            //      LOG.info("discover ptns called");
+
+            OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+            harProcessor.setEnabled(jobInfo.getHarRequested());
+
+            List<Integer> dynamicPartCols = jobInfo.getPosOfDynPartCols();
+            int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+
+            Path loadPath = new Path(jobInfo.getLocation());
+            FileSystem fs = loadPath.getFileSystem(context.getConfiguration());
+
+            // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+
+            String dynPathSpec = loadPath.toUri().getPath();
+            dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*");
+            // TODO : replace this with a param pull from HiveConf
+
+            //      LOG.info("Searching for "+dynPathSpec);
+            Path pathPattern = new Path(loadPath, dynPathSpec);
+            FileStatus[] status = fs.globStatus(pathPattern);
+
+            partitionsDiscoveredByPath = new LinkedHashMap<String,Map<String, String>>();
+            storageDriversDiscoveredByPath = new LinkedHashMap<String,HCatOutputStorageDriver>();
+
+
+            if (status.length == 0) {
+                //        LOG.warn("No partition found genereated by dynamic partitioning in ["
+                //            +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize()
+                //            +"], dynSpec["+dynPathSpec+"]");
+            }else{
+                if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){
+                    this.partitionsDiscovered = true;
+                    throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+                            "Number of dynamic partitions being created "
+                                    + "exceeds configured max allowable partitions["
+                                    + maxDynamicPartitions
+                                    + "], increase parameter ["
+                                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                                    + "] if needed.");
+                }
+
+                for (FileStatus st : status){
+                    LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
+                    Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+                    partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
+                    storageDriversDiscoveredByPath.put(st.getPath().toString(),
+                            HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec));
+                }
+            }
+
+            //      for (Entry<String,Map<String,String>> spec : partitionsDiscoveredByPath.entrySet()){
+            //        LOG.info("Partition "+ spec.getKey());
+            //        for (Entry<String,String> e : spec.getValue().entrySet()){
+            //          LOG.info(e.getKey() + "=>" +e.getValue());
+            //        }
+            //      }
+
+            this.partitionsDiscovered = true;
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * File-based storage (ie RCFile, Text, etc) implementation of OutputFormatContainer.
+ * This implementation supports the following HCatalog features: partitioning, dynamic partitioning, Hadoop Archiving, etc.
+ */
+class FileOutputFormatContainer extends OutputFormatContainer {
+    private OutputFormat<? super WritableComparable<?>, ? super Writable> of;
+
+    private static final PathFilter hiddenFileFilter = new PathFilter(){
+      public boolean accept(Path p){
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    };
+
+    /**
+     * @param of base OutputFormat to contain
+     */
+    public FileOutputFormatContainer(OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
+        super(of);
+        this.of = of;
+    }
+
+    @Override
+    public RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new FileRecordWriterContainer(of.getRecordWriter(context),context);
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+        try {
+            handleDuplicatePublish(context,
+                    jobInfo,
+                    HCatOutputFormat.createHiveClient(null,context.getConfiguration()),
+                    jobInfo.getTableInfo().getTable());
+        } catch (MetaException e) {
+            throw new IOException(e);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        of.checkOutputSpecs(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new FileOutputCommitterContainer(context,of.getOutputCommitter(context));
+    }
+
+    /**
+     * Handles duplicate publish of partition. Fails if partition already exists.
+     * For non partitioned tables, fails if files are present in table directory.
+     * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time
+     * @param context the job
+     * @param outputInfo the output info
+     * @param client the metastore client
+     * @param table the table being written to
+     * @throws IOException
+     * @throws org.apache.hadoop.hive.metastore.api.MetaException
+     * @throws org.apache.thrift.TException
+     */
+    private static void handleDuplicatePublish(JobContext context, OutputJobInfo outputInfo,
+                                               HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
+
+        /*
+        * For fully specified ptn, follow strict checks for existence of partitions in metadata
+        * For unpartitioned tables, follow filechecks
+        * For partially specified tables:
+        *    This would then need filechecks at the start of a ptn write,
+        *    Doing metadata checks can get potentially very expensive (fat conf) if
+        *    there are a large number of partitions that match the partial specifications
+        */
+
+        if( table.getPartitionKeys().size() > 0 ) {
+            if (!outputInfo.isDynamicPartitioningUsed()){
+                List<String> partitionValues = getPartitionValueList(
+                        table, outputInfo.getPartitionValues());
+                // fully-specified partition
+                List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
+                        outputInfo.getTableName(), partitionValues, (short) 1);
+
+                if( currentParts.size() > 0 ) {
+                    throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+                }
+            }
+        } else {
+            List<String> partitionValues = getPartitionValueList(
+                    table, outputInfo.getPartitionValues());
+            // non-partitioned table
+
+            Path tablePath = new Path(table.getSd().getLocation());
+            FileSystem fs = tablePath.getFileSystem(context.getConfiguration());
+
+            if ( fs.exists(tablePath) ) {
+                FileStatus[] status = fs.globStatus(new Path(tablePath, "*"), hiddenFileFilter);
+
+                if( status.length > 0 ) {
+                    throw new HCatException(ErrorType.ERROR_NON_EMPTY_TABLE,
+                            table.getDbName() + "." + table.getTableName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Convert the partition value map to a value list in the partition key order.
+     * @param table the table being written to
+     * @param valueMap the partition value map
+     * @return the partition value list
+     * @throws java.io.IOException
+     */
+    static List<String> getPartitionValueList(Table table, Map<String, String> valueMap) throws IOException {
+
+        if( valueMap.size() != table.getPartitionKeys().size() ) {
+            throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,
+                    "Table "
+                            + table.getTableName() + " has " +
+                            table.getPartitionKeys().size() + " partition keys, got "+
+                            valueMap.size());
+        }
+
+        List<String> values = new ArrayList<String>();
+
+        for(FieldSchema schema : table.getPartitionKeys()) {
+            String value = valueMap.get(schema.getName().toLowerCase());
+
+            if( value == null ) {
+                throw new HCatException(ErrorType.ERROR_MISSING_PARTITION_KEY,
+                        "Key " + schema.getName() + " of table " + table.getTableName());
+            }
+
+            values.add(value);
+        }
+
+        return values;
+    }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Base class for File-based OutputStorageDrivers to extend. Provides subclasses
+ * the convenience of not having to rewrite mechanisms such as, dynamic
+ * partitioning, partition registration, success file, etc.
+ */
+public abstract class FileOutputStorageDriver extends HCatOutputStorageDriver {
+
+    /** The directory under which data is initially written for a partitioned table */
+    protected static final String DYNTEMP_DIR_NAME = "_DYN";
+
+    /** The directory under which data is initially written for a non partitioned table */
+    protected static final String TEMP_DIR_NAME = "_TEMP";
+    private OutputFormat<WritableComparable<?>, ? super Writable> outputFormat;
+
+
+    @Override
+    public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException {
+        super.initialize(jobContext, hcatProperties);
+    }
+
+    @Override
+    public final String getOutputLocation(JobContext jobContext,
+                                    String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+        String parentPath = tableLocation;
+        // For dynamic partitioned writes without all keyvalues specified,
+        // we create a temp dir for the associated write job
+        if (dynHash != null){
+            parentPath = new Path(tableLocation, DYNTEMP_DIR_NAME+dynHash).toString();
+        }
+
+        // For non-partitioned tables, we send them to the temp dir
+        if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) {
+            return new Path(tableLocation, TEMP_DIR_NAME).toString();
+        }
+
+        List<String> values = new ArrayList<String>();
+        for(String partitionCol : partitionCols) {
+            values.add(partitionValues.get(partitionCol));
+        }
+
+        String partitionLocation = FileUtils.makePartName(partitionCols, values);
+
+        Path path = new Path(parentPath, partitionLocation);
+        return path.toString();
+    }
+
+    @Override
+    public final Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
+        return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getUniqueFile(context, "part", ""));
+    }
+
+    /**
+   * Any initialization of file paths, set permissions and group on freshly created files
+   * This is called at RecordWriter instantiation time which can be at write-time for
+   * a dynamic partitioning usecase
+     * @param context
+     * @throws IOException
+     */
+    static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException {
+      OutputJobInfo info =  HCatBaseOutputFormat.getJobInfo(context);
+//      Path workFile = osd.getWorkFilePath(context,info.getLocation());
+      Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
+      Path tblPath = new Path(info.getTableInfo().getTable().getSd().getLocation());
+      FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
+      FileStatus tblPathStat = fs.getFileStatus(tblPath);
+
+//      LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+
+//          workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+
+//          context.getConfiguration().get("mapred.output.dir")+"]");
+//
+//      FileStatus wFileStatus = fs.getFileStatus(workFile);
+//      LOG.info("Table : "+tblPathStat.getPath());
+//      LOG.info("Working File : "+wFileStatus.getPath());
+
+      fs.setPermission(workFile, tblPathStat.getPermission());
+      try{
+        fs.setOwner(workFile, null, tblPathStat.getGroup());
+      } catch(AccessControlException ace){
+        // log the messages before ignoring. Currently, logging is not built in HCat.
+      }
+    }
+
+    @Override
+    OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) {
+        return new FileOutputFormatContainer(outputFormat);
+    }
+}
+

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1177788&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Fri Sep 30 19:23:21 2011
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Part of the FileOutput*Container classes
+ * See {@link FileOutputFormatContainer} for more information
+ */
+class FileRecordWriterContainer extends RecordWriterContainer {
+
+    private final HCatOutputStorageDriver storageDriver;
+
+    private boolean dynamicPartitioningUsed = false;
+
+//    static final private Log LOG = LogFactory.getLog(FileRecordWriterContainer.class);
+
+    private final Map<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+    private final Map<Integer,HCatOutputStorageDriver> baseDynamicStorageDrivers;
+    private final Map<Integer,OutputCommitter> baseDynamicCommitters;
+
+
+    private final List<Integer> partColsToDel;
+    private final List<Integer> dynamicPartCols;
+    private int maxDynamicPartitions;
+
+    private OutputJobInfo jobInfo;
+    private TaskAttemptContext context;
+
+    /**
+     * @param baseWriter RecordWriter to contain
+     * @param context current TaskAttemptContext
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public FileRecordWriterContainer(RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+                                     TaskAttemptContext context) throws IOException, InterruptedException {
+        super(context,baseWriter);
+        this.context = context;
+        jobInfo = HCatOutputFormat.getJobInfo(context);
+        storageDriver = HCatOutputFormat.getOutputDriverInstance(context,jobInfo);
+
+        // If partition columns occur in data, we want to remove them.
+        partColsToDel = jobInfo.getPosOfPartCols();
+        dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
+        dynamicPartCols = jobInfo.getPosOfDynPartCols();
+        maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+
+        if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
+            throw new HCatException("It seems that setSchema() is not called on " +
+                    "HCatOutputFormat. Please make sure that method is called.");
+        }
+
+
+        if (!dynamicPartitioningUsed) {
+            this.baseDynamicStorageDrivers = null;
+            this.baseDynamicWriters = null;
+            this.baseDynamicCommitters = null;
+            prepareForStorageDriverOutput(context);
+        }
+        else {
+            this.baseDynamicStorageDrivers = new HashMap<Integer,HCatOutputStorageDriver>();
+            this.baseDynamicWriters = new HashMap<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+            this.baseDynamicCommitters = new HashMap<Integer,OutputCommitter>();
+        }
+    }
+
+    /**
+     * @return the storageDriver
+     */
+    public HCatOutputStorageDriver getStorageDriver() {
+        return storageDriver;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        if (dynamicPartitioningUsed){
+            for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+                bwriter.close(context);
+            }
+            for(Map.Entry<Integer,OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
+//            for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){
+                OutputCommitter baseOutputCommitter = entry.getValue();
+                if (baseOutputCommitter.needsTaskCommit(context)){
+                    baseOutputCommitter.commitTask(context);
+                }
+            }
+        } else {
+            getBaseRecordWriter().close(context);
+        }
+    }
+
+    @Override
+    public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
+            InterruptedException {
+        RecordWriter localWriter;
+        HCatOutputStorageDriver localDriver;
+
+//      HCatUtil.logList(LOG, "HCatRecord to write", value.getAll());
+
+        if (dynamicPartitioningUsed){
+            // calculate which writer to use from the remaining values - this needs to be done before we delete cols
+            List<String> dynamicPartValues = new ArrayList<String>();
+            for (Integer colToAppend :  dynamicPartCols){
+                dynamicPartValues.add(value.get(colToAppend).toString());
+            }
+
+            int dynHashCode = dynamicPartValues.hashCode();
+            if (!baseDynamicWriters.containsKey(dynHashCode)){
+//          LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size()
+//              +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString());
+                if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){
+                    throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
+                            "Number of dynamic partitions being created "
+                                    + "exceeds configured max allowable partitions["
+                                    + maxDynamicPartitions
+                                    + "], increase parameter ["
+                                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                                    + "] if needed.");
+                }
+//          HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues);
+//          HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols);
+
+                HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues);
+
+                RecordWriter baseRecordWriter = localOsd.getOutputFormat().getRecordWriter(context);
+                OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context);
+                baseOutputCommitter.setupJob(context);
+                baseOutputCommitter.setupTask(context);
+                prepareForStorageDriverOutput(localOsd,context);
+                baseDynamicWriters.put(dynHashCode, baseRecordWriter);
+                baseDynamicStorageDrivers.put(dynHashCode,localOsd);
+                baseDynamicCommitters.put(dynHashCode,baseOutputCommitter);
+            }
+
+            localWriter = baseDynamicWriters.get(dynHashCode);
+            localDriver = baseDynamicStorageDrivers.get(dynHashCode);
+        }else{
+            localWriter = getBaseRecordWriter();
+            localDriver = storageDriver;
+        }
+
+        for(Integer colToDel : partColsToDel){
+            value.remove(colToDel);
+        }
+
+        //The key given by user is ignored
+        WritableComparable<?> generatedKey = localDriver.generateKey(value);
+        Writable convertedValue = localDriver.convertValue(value);
+        localWriter.write(generatedKey, convertedValue);
+    }
+
+    protected HCatOutputStorageDriver createDynamicStorageDriver(List<String> dynamicPartVals) throws IOException {
+        HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals);
+        return localOsd;
+    }
+
+    public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException {
+        // Set permissions and group on freshly created files.
+        if (!dynamicPartitioningUsed){
+            HCatOutputStorageDriver localOsd = this.getStorageDriver();
+            prepareForStorageDriverOutput(localOsd,context);
+        }
+    }
+
+    private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd,
+                                               TaskAttemptContext context) throws IOException {
+        FileOutputStorageDriver.prepareOutputLocation(localOsd, context);
+    }
+
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Fri Sep 30 19:23:21 2011
@@ -20,15 +20,14 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -63,8 +62,7 @@ public abstract class HCatBaseOutputForm
   @Override
   public void checkOutputSpecs(JobContext context
                                         ) throws IOException, InterruptedException {
-      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
-      outputFormat.checkOutputSpecs(context);
+      getOutputFormat(context).checkOutputSpecs(context);
   }
 
   /**
@@ -73,13 +71,10 @@ public abstract class HCatBaseOutputForm
    * @return the output format instance
    * @throws IOException
    */
-  protected OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat(JobContext context) throws IOException {
+  protected OutputFormat<WritableComparable<?>, HCatRecord> getOutputFormat(JobContext context) throws IOException {
       OutputJobInfo jobInfo = getJobInfo(context);
       HCatOutputStorageDriver  driver = getOutputDriverInstance(context, jobInfo);
-
-      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat =
-            driver.getOutputFormat();
-      return outputFormat;
+      return driver.getOutputFormatContainer(driver.getOutputFormat());
   }
 
   /**
@@ -243,4 +238,24 @@ public abstract class HCatBaseOutputForm
     jobInfo.setPosOfDynPartCols(posOfDynPartCols);
     jobInfo.setOutputSchema(schemaWithoutParts);
   }
+
+  static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception {
+    HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(outputJobInfo.getServerUri(), context.getConfiguration());
+    // cancel the deleg. tokens that were acquired for this job now that
+    // we are done - we should cancel if the tokens were acquired by
+    // HCatOutputFormat and not if they were supplied by Oozie. In the latter
+    // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+    String tokenStrForm = client.getTokenStrForm();
+    if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+      client.cancelDelegationToken(tokenStrForm);
+    }
+
+    String jcTokenStrForm =
+      context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+    String jcTokenSignature =
+      context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+    if(jcTokenStrForm != null && jcTokenSignature != null) {
+      HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+    }
+  }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Fri Sep 30 19:23:21 2011
@@ -39,17 +39,66 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
 
-public class HCatEximOutputCommitter extends HCatBaseOutputCommitter {
+public class HCatEximOutputCommitter extends OutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
 
+  private final OutputCommitter baseCommitter;
+
   public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
-    super(context,baseCommitter);
+    this.baseCommitter = baseCommitter;
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.abortTask(context);
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.commitTask(context);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+      return baseCommitter.needsTaskCommit(context);
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    if( baseCommitter != null ) {
+      baseCommitter.setupJob(context);
+    }
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.setupTask(context);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+    if(baseCommitter != null) {
+      baseCommitter.abortJob(jobContext, state);
+    }
+    OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+
+    Path src = new Path(jobInfo.getLocation());
+    FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+    fs.delete(src, true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    if(baseCommitter != null) {
+      baseCommitter.commitJob(jobContext);
+    }
   }
 
   @Override

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1177788&r1=1177787&r2=1177788&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Fri Sep 30 19:23:21 2011
@@ -35,11 +35,10 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.ErrorType;
@@ -76,8 +75,7 @@ public class HCatEximOutputFormat extend
   public RecordWriter<WritableComparable<?>, HCatRecord>
       getRecordWriter(TaskAttemptContext context
                       ) throws IOException, InterruptedException {
-    HCatRecordWriter rw = new HCatRecordWriter(context);
-    return rw;
+    return getOutputFormat(context).getRecordWriter(context);
   }
 
   /**
@@ -90,8 +88,18 @@ public class HCatEximOutputFormat extend
    */
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
-      return new HCatEximOutputCommitter(context,outputFormat.getOutputCommitter(context));
+      return new HCatEximOutputCommitter(context,((OutputCommitterContainer)getOutputFormat(context).getOutputCommitter(context)).getBaseOutputCommitter());
+  }
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * @param context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context
+                                        ) throws IOException, InterruptedException {
+      ((OutputFormatContainer)getOutputFormat(context)).getBaseOutputFormat().checkOutputSpecs(context);
   }
 
   public static void setOutput(Job job, String dbname, String tablename, String location,



Mime
View raw message