incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From avand...@apache.org
Subject svn commit: r1393259 - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/
Date Wed, 03 Oct 2012 02:32:14 GMT
Author: avandana
Date: Wed Oct  3 02:32:14 2012
New Revision: 1393259

URL: http://svn.apache.org/viewvc?rev=1393259&view=rev
Log:
HCAT-451 Partitions are created even when Jobs are aborted

Added:
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Oct  3 02:32:14 2012
@@ -119,6 +119,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-451 Partitions are created even when Jobs are aborted (avandana)
+
   HCAT-513  Data Store onto HCatalog table fails for dynamic partitioning as the temporary
directory gets deleted by the completed map tasks  (amalakar via toffer)
 
   HCAT-497 HCatContext should use the jobconf instead of its own conf (traviscrawford)

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
(original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorer.java
Wed Oct  3 02:32:14 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hcatalog.pig;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -25,6 +26,7 @@ import java.util.Iterator;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hcatalog.HcatTestUtils;
 import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -593,4 +595,62 @@ public class TestHCatStorer extends HCat
         Assert.assertEquals(0, results.size());
         driver.run("drop table employee");
     }
+
+    public void testPartitionPublish()
+        throws IOException, CommandNeedRetryException {
+
+        driver.run("drop table ptn_fail");
+        String createTable = "create table ptn_fail(a int, c string) partitioned by (b string)
stored as RCFILE";
+        int retCode = driver.run(createTable).getResponseCode();
+        if (retCode != 0) {
+            throw new IOException("Failed to create table.");
+        }
+        int LOOP_SIZE = 11;
+        String[] input = new String[LOOP_SIZE];
+
+        for (int i = 0; i < LOOP_SIZE; i++) {
+            input[i] = i + "\tmath";
+        }
+        HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, input);
+        PigServer server = new PigServer(ExecType.LOCAL);
+        server.setBatchOn();
+        server.registerQuery("A = load '" + INPUT_FILE_NAME
+                + "' as (a:int, c:chararray);");
+        server.registerQuery("B = filter A by " + FailEvalFunc.class.getName()
+                + "($0);");
+        server.registerQuery("store B into 'ptn_fail' using "
+                + HCatStorer.class.getName() + "('b=math');");
+        server.executeBatch();
+
+        String query = "show partitions ptn_fail";
+        retCode = driver.run(query).getResponseCode();
+
+        if (retCode != 0) {
+            throw new IOException("Error " + retCode + " running query "
+                    + query);
+        }
+
+        ArrayList<String> res = new ArrayList<String>();
+        driver.getResults(res);
+        Assert.assertEquals(0, res.size());
+
+        // Make sure the partitions directory is not in hdfs.
+        Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists());
+        Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math"))
+                .exists());
+    }
+
+    static public class FailEvalFunc extends EvalFunc<Boolean> {
+
+        /*
+         * @param Tuple /* @return null /* @throws IOException
+         *
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public Boolean exec(Tuple tuple) throws IOException {
+            throw new IOException("Eval Func to mimic Failure.");
+        }
+
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
Wed Oct  3 02:32:14 2012
@@ -18,6 +18,15 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,15 +61,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 /**
  * Part of the FileOutput*Container classes
  * See {@link FileOutputFormatContainer} for more information
@@ -139,59 +139,40 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
-        org.apache.hadoop.mapred.JobContext
-            mapRedJobContext = HCatMapRedUtil.createJobContext(jobContext);
-        if (dynamicPartitioningUsed) {
-            discoverPartitions(jobContext);
-        }
-
-        if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().abortJob(mapRedJobContext, state);
-        } else if (dynamicPartitioningUsed) {
-            for (JobContext currContext : contextDiscoveredByPath.values()) {
-                try {
-                    new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext,
state);
-                } catch (Exception e) {
-                    throw new IOException(e);
-                }
-            }
-        }
-
-        HiveMetaStoreClient client = null;
         try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(jobContext.getConfiguration());
-            client = HCatUtil.getHiveClient(hiveConf);
-            // cancel the deleg. tokens that were acquired for this job now that
-            // we are done - we should cancel if the tokens were acquired by
-            // HCatOutputFormat and not if they were supplied by Oozie.
-            // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
-            // the conf will not be set
-            String tokenStrForm = client.getTokenStrForm();
-            if (tokenStrForm != null && jobContext.getConfiguration().get
-                (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
-                client.cancelDelegationToken(tokenStrForm);
+            if (dynamicPartitioningUsed) {
+                discoverPartitions(jobContext);
             }
-        } catch (Exception e) {
-            if (e instanceof HCatException) {
-                throw (HCatException) e;
+            org.apache.hadoop.mapred.JobContext mapRedJobContext = HCatMapRedUtil
+                    .createJobContext(jobContext);
+            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
+                getBaseOutputCommitter().abortJob(mapRedJobContext, state);
+            } else if (dynamicPartitioningUsed) {
+                for (JobContext currContext : contextDiscoveredByPath.values()) {
+                    try {
+                        new JobConf(currContext.getConfiguration())
+                                .getOutputCommitter().abortJob(currContext,
+                                        state);
+                    } catch (Exception e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+            Path src;
+            OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+            if (dynamicPartitioningUsed) {
+                src = new Path(getPartitionRootLocation(jobInfo.getLocation()
+                        .toString(), jobInfo.getTableInfo().getTable()
+                        .getPartitionKeysSize()));
             } else {
-                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+                src = new Path(jobInfo.getLocation());
             }
+            FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+            LOG.info("Job failed. Cleaning up temporary directory [{}].", src);
+            fs.delete(src, true);
         } finally {
-            HCatUtil.closeHiveClientQuietly(client);
+            cancelDelegationTokens(jobContext);
         }
-
-        Path src;
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if (dynamicPartitioningUsed) {
-            src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
-                jobInfo.getTableInfo().getTable().getPartitionKeysSize()));
-        } else {
-            src = new Path(jobInfo.getLocation());
-        }
-        FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
-//      LOG.warn("abortJob about to delete ["+src.toString() +"]");
-        fs.delete(src, true);
     }
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -205,191 +186,50 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void commitJob(JobContext jobContext) throws IOException {
-        if (dynamicPartitioningUsed) {
-            discoverPartitions(jobContext);
-            // Commit each partition so it gets moved out of the job work dir
-            for (JobContext context : contextDiscoveredByPath.values()) {
-                new JobConf(context.getConfiguration()).getOutputCommitter().commitJob(context);
-            }
-        }
-        if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
-        }
-        // create _SUCCESS FILE if so requested.
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
-        if (getOutputDirMarking(jobContext.getConfiguration())) {
-            Path outputPath = new Path(jobInfo.getLocation());
-            if (outputPath != null) {
-                FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
-                // create a file in the folder to mark it
-                if (fileSys.exists(outputPath)) {
-                    Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-                    if (!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
-                        fileSys.create(filePath).close();
-                    }
-                }
-            }
-        }
-        cleanupJob(jobContext);
-    }
-
-    @Override
-    public void cleanupJob(JobContext context) throws IOException {
-
-        if (dynamicPartitioningUsed) {
-            discoverPartitions(context);
-        }
-
-
-        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-        Configuration conf = context.getConfiguration();
-        Table table = new Table(jobInfo.getTableInfo().getTable());
-        Path tblPath = new Path(table.getTTable().getSd().getLocation());
-        FileSystem fs = tblPath.getFileSystem(conf);
-
-        if (table.getPartitionKeys().size() == 0) {
-            //non partitioned table
-            if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
-            } else if (dynamicPartitioningUsed) {
-                for (JobContext currContext : contextDiscoveredByPath.values()) {
-                    try {
-                        JobConf jobConf = new JobConf(currContext.getConfiguration());
-                        jobConf.getOutputCommitter().cleanupJob(currContext);
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                }
-            }
-
-            //Move data from temp directory the actual table directory
-            //No metastore operation required.
-            Path src = new Path(jobInfo.getLocation());
-            moveTaskOutputs(fs, src, src, tblPath, false);
-            fs.delete(src, true);
-            return;
-        }
-
-        HiveMetaStoreClient client = null;
-        HCatTableInfo tableInfo = jobInfo.getTableInfo();
-
-        List<Partition> partitionsAdded = new ArrayList<Partition>();
-
         try {
-            HiveConf hiveConf = HCatUtil.getHiveConf(conf);
-            client = HCatUtil.getHiveClient(hiveConf);
-
-            StorerInfo storer =
-                InternalUtil.extractStorerInfo(table.getTTable().getSd(), table.getParameters());
-
-            updateTableSchema(client, table, jobInfo.getOutputSchema());
-
-            FileStatus tblStat = fs.getFileStatus(tblPath);
-            String grpName = tblStat.getGroup();
-            FsPermission perms = tblStat.getPermission();
-
-            List<Partition> partitionsToAdd = new ArrayList<Partition>();
-            if (!dynamicPartitioningUsed) {
-                partitionsToAdd.add(
-                    constructPartition(
-                        context, jobInfo,
-                        tblPath.toString(), jobInfo.getPartitionValues()
-                        , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                        , table, fs
-                        , grpName, perms));
-            } else {
-                for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet())
{
-                    partitionsToAdd.add(
-                        constructPartition(
-                            context, jobInfo,
-                            getPartitionRootLocation(entry.getKey(), entry.getValue().size()),
entry.getValue()
-                            , jobInfo.getOutputSchema(), getStorerParameterMap(storer)
-                            , table, fs
-                            , grpName, perms));
+            if (dynamicPartitioningUsed) {
+                discoverPartitions(jobContext);
+                // Commit each partition so it gets moved out of the job work
+                // dir
+                for (JobContext context : contextDiscoveredByPath.values()) {
+                    new JobConf(context.getConfiguration())
+                            .getOutputCommitter().commitJob(context);
                 }
             }
-
-            //Publish the new partition(s)
-            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty()))
{
-
-                Path src = new Path(ptnRootLocation);
-
-                // check here for each dir we're copying out, to see if it already exists,
error out if so
-                moveTaskOutputs(fs, src, src, tblPath, true);
-
-                moveTaskOutputs(fs, src, src, tblPath, false);
-                fs.delete(src, true);
-
-
-//          for (Partition partition : partitionsToAdd){
-//            partitionsAdded.add(client.add_partition(partition));
-//            // currently following add_partition instead of add_partitions because latter
isn't
-//            // all-or-nothing and we want to be able to roll back partitions we added if
need be.
-//          }
-
-                try {
-                    client.add_partitions(partitionsToAdd);
-                    partitionsAdded = partitionsToAdd;
-                } catch (Exception e) {
-                    // There was an error adding partitions : rollback fs copy and rethrow
-                    for (Partition p : partitionsToAdd) {
-                        Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
-                        if (fs.exists(ptnPath)) {
-                            fs.delete(ptnPath, true);
-                        }
-                    }
-                    throw e;
-                }
-
-            } else {
-                // no harProcessor, regular operation
-
-                // No duplicate partition publish case to worry about because we'll
-                // get a AlreadyExistsException here if so, and appropriately rollback
-
-                client.add_partitions(partitionsToAdd);
-                partitionsAdded = partitionsToAdd;
-
-                if (dynamicPartitioningUsed && (partitionsAdded.size() > 0)) {
-                    Path src = new Path(ptnRootLocation);
-                    moveTaskOutputs(fs, src, src, tblPath, false);
-                    fs.delete(src, true);
-                }
-
-            }
-
             if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
+                getBaseOutputCommitter().commitJob(
+                        HCatMapRedUtil.createJobContext(jobContext));
             }
-
-            if (Security.getInstance().isSecurityEnabled()) {
-                Security.getInstance().cancelToken(client, context);
-            }
-        } catch (Exception e) {
-
-            if (partitionsAdded.size() > 0) {
-                try {
-                    //baseCommitter.cleanupJob failed, try to clean up the metastore
-                    for (Partition p : partitionsAdded) {
-                        client.dropPartition(tableInfo.getDatabaseName(),
-                            tableInfo.getTableName(), p.getValues());
+            registerPartitions(jobContext);
+            // create _SUCCESS FILE if so requested.
+            OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+            if (getOutputDirMarking(jobContext.getConfiguration())) {
+                Path outputPath = new Path(jobInfo.getLocation());
+                if (outputPath != null) {
+                    FileSystem fileSys = outputPath.getFileSystem(jobContext
+                            .getConfiguration());
+                    // create a file in the folder to mark it
+                    if (fileSys.exists(outputPath)) {
+                        Path filePath = new Path(outputPath,
+                                SUCCEEDED_FILE_NAME);
+                        if (!fileSys.exists(filePath)) { // may have been
+                                                         // created by
+                                                         // baseCommitter.commitJob()
+                            fileSys.create(filePath).close();
+                        }
                     }
-                } catch (Exception te) {
-                    //Keep cause as the original exception
-                    throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
                 }
             }
-
-            if (e instanceof HCatException) {
-                throw (HCatException) e;
-            } else {
-                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
-            }
         } finally {
-            HCatUtil.closeHiveClientQuietly(client);
+            cancelDelegationTokens(jobContext);
         }
     }
 
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+        throw new IOException("The method cleanupJob is deprecated and should not be called.");
+    }
+
     private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) {
         if (ptnRootLocation == null) {
             // we only need to calculate it once, it'll be the same for other partitions
in this job.
@@ -478,7 +318,6 @@ class FileOutputCommitterContainer exten
         } else {
             partition.getSd().setLocation(partPath.toString());
         }
-
         return partition;
     }
 
@@ -701,4 +540,153 @@ class FileOutputCommitterContainer exten
         }
     }
 
+    private void registerPartitions(JobContext context) throws IOException{
+        if (dynamicPartitioningUsed){
+            discoverPartitions(context);
+        }
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+        Configuration conf = context.getConfiguration();
+        Table table = new Table(jobInfo.getTableInfo().getTable());
+        Path tblPath = new Path(table.getTTable().getSd().getLocation());
+        FileSystem fs = tblPath.getFileSystem(conf);
+
+        if( table.getPartitionKeys().size() == 0 ) {
+            //Move data from temp directory the actual table directory
+            //No metastore operation required.
+            Path src = new Path(jobInfo.getLocation());
+            moveTaskOutputs(fs, src, src, tblPath, false);
+            fs.delete(src, true);
+            return;
+        }
+
+        HiveMetaStoreClient client = null;
+        HCatTableInfo tableInfo = jobInfo.getTableInfo();
+        List<Partition> partitionsAdded = new ArrayList<Partition>();
+        try {
+            HiveConf hiveConf = HCatUtil.getHiveConf(conf);
+            client = HCatUtil.getHiveClient(hiveConf);
+            StorerInfo storer = InternalUtil.extractStorerInfo(table.getTTable().getSd(),table.getParameters());
+
+            FileStatus tblStat = fs.getFileStatus(tblPath);
+            String grpName = tblStat.getGroup();
+            FsPermission perms = tblStat.getPermission();
+
+            List<Partition> partitionsToAdd = new ArrayList<Partition>();
+            if (!dynamicPartitioningUsed){
+                partitionsToAdd.add(
+                        constructPartition(
+                                context,jobInfo,
+                                tblPath.toString(), jobInfo.getPartitionValues()
+                                ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                ,table, fs
+                                ,grpName,perms));
+            }else{
+                for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+                    partitionsToAdd.add(
+                            constructPartition(
+                                    context,jobInfo,
+                                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()),
entry.getValue()
+                                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                                    ,table, fs
+                                    ,grpName,perms));
+                }
+            }
+
+            ArrayList<Map<String,String>> ptnInfos = new ArrayList<Map<String,String>>();
+            for(Partition ptn : partitionsToAdd){
+                ptnInfos.add(InternalUtil.createPtnKeyValueMap(new Table(tableInfo.getTable()),
ptn));
+            }
+
+            //Publish the new partition(s)
+            if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+
+                Path src = new Path(ptnRootLocation);
+                // check here for each dir we're copying out, to see if it
+                // already exists, error out if so
+                moveTaskOutputs(fs, src, src, tblPath,true);
+                moveTaskOutputs(fs, src, src, tblPath,false);
+                fs.delete(src, true);
+                try {
+                    updateTableSchema(client, table, jobInfo.getOutputSchema());
+                    LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                    client.add_partitions(partitionsToAdd);
+                    partitionsAdded = partitionsToAdd;
+                } catch (Exception e){
+                    // There was an error adding partitions : rollback fs copy and rethrow
+                    for (Partition p : partitionsToAdd){
+                        Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+                        if (fs.exists(ptnPath)){
+                            fs.delete(ptnPath,true);
+                        }
+                    }
+                    throw e;
+                }
+
+            }else{
+                // no harProcessor, regular operation
+                // No duplicate partition publish case to worry about because we'll
+                // get a AlreadyExistsException here if so, and appropriately rollback
+                updateTableSchema(client, table, jobInfo.getOutputSchema());
+                LOG.info("The table {} has new partitions {}.", table.getTableName(),ptnInfos);
+                client.add_partitions(partitionsToAdd);
+                partitionsAdded = partitionsToAdd;
+                if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+                    Path src = new Path(ptnRootLocation);
+                    moveTaskOutputs(fs, src, src, tblPath,false);
+                    fs.delete(src, true);
+                }
+            }
+        } catch (Exception e) {
+            if (partitionsAdded.size() > 0) {
+                try {
+                    // baseCommitter.cleanupJob failed, try to clean up the
+                    // metastore
+                    for (Partition p : partitionsAdded) {
+                        client.dropPartition(tableInfo.getDatabaseName(),
+                                tableInfo.getTableName(), p.getValues());
+                    }
+                } catch (Exception te) {
+                    // Keep cause as the original exception
+                    throw new HCatException(
+                            ErrorType.ERROR_PUBLISHING_PARTITION, e);
+                }
+            }
+            if (e instanceof HCatException) {
+                throw (HCatException) e;
+            } else {
+                throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+            }
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+    }
+
+    private void cancelDelegationTokens(JobContext context) throws IOException{
+        LOG.info("Cancelling deletgation token for the job.");
+        HiveMetaStoreClient client = null;
+        try {
+            HiveConf hiveConf = HCatUtil
+                    .getHiveConf(context.getConfiguration());
+            client = HCatUtil.getHiveClient(hiveConf);
+            // cancel the deleg. tokens that were acquired for this job now that
+            // we are done - we should cancel if the tokens were acquired by
+            // HCatOutputFormat and not if they were supplied by Oozie.
+            // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in
+            // the conf will not be set
+            String tokenStrForm = client.getTokenStrForm();
+            if (tokenStrForm != null
+                    && context.getConfiguration().get(
+                            HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                client.cancelDelegationToken(tokenStrForm);
+            }
+        } catch (MetaException e) {
+            LOG.warn("MetaException while cancelling delegation token.",e );
+        } catch (TException e) {
+            LOG.warn("TException while cancelling delegation token.", e);
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+    }
+
+
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Wed
Oct  3 02:32:14 2012
@@ -27,7 +27,6 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -119,7 +118,7 @@ public class InitializeInput {
                         new org.apache.hadoop.hive.ql.metadata.Partition(table, ptn));
                     PartInfo partInfo = extractPartInfo(schema, ptn.getSd(),
                         ptn.getParameters(), job.getConfiguration(), inputJobInfo);
-                    partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
+                    partInfo.setPartitionValues(InternalUtil.createPtnKeyValueMap(table,
ptn));
                     partInfoList.add(partInfo);
                 }
 
@@ -140,27 +139,6 @@ public class InitializeInput {
 
     }
 
-    private static Map<String, String> createPtnKeyValueMap(Table table, Partition
ptn) throws IOException {
-        List<String> values = ptn.getValues();
-        if (values.size() != table.getPartitionKeys().size()) {
-            throw new IOException("Partition values in partition inconsistent with table
definition, table "
-                + table.getTableName() + " has "
-                + table.getPartitionKeys().size()
-                + " partition keys, partition has " + values.size() + "partition values");
-        }
-
-        Map<String, String> ptnKeyValues = new HashMap<String, String>();
-
-        int i = 0;
-        for (FieldSchema schema : table.getPartitionKeys()) {
-            // CONCERN : the way this mapping goes, the order *needs* to be preserved for
table.getPartitionKeys() and ptn.getValues()
-            ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
-            i++;
-        }
-
-        return ptnKeyValues;
-    }
-
     private static PartInfo extractPartInfo(HCatSchema schema, StorageDescriptor sd,
                                             Map<String, String> parameters, Configuration
conf,
                                             InputJobInfo inputJobInfo) throws IOException
{

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Wed
Oct  3 02:32:14 2012
@@ -21,7 +21,9 @@ package org.apache.hcatalog.mapreduce;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -48,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -185,4 +188,30 @@ class InternalUtil {
                 + " but found " + split.getClass().getName());
         }
     }
+
+
+    static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn)
+        throws IOException {
+        List<String> values = ptn.getValues();
+        if (values.size() != table.getPartitionKeys().size()) {
+            throw new IOException(
+                    "Partition values in partition inconsistent with table definition, table
"
+                            + table.getTableName() + " has "
+                            + table.getPartitionKeys().size()
+                            + " partition keys, partition has " + values.size()
+                            + "partition values");
+        }
+
+        Map<String, String> ptnKeyValues = new HashMap<String, String>();
+
+        int i = 0;
+        for (FieldSchema schema : table.getPartitionKeys()) {
+            // CONCERN : the way this mapping goes, the order *needs* to be
+            // preserved for table.getPartitionKeys() and ptn.getValues()
+            ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+            i++;
+        }
+
+        return ptnKeyValues;
+    }
 }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1393259&r1=1393258&r2=1393259&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
Wed Oct  3 02:32:14 2012
@@ -155,7 +155,7 @@ public class TestHCatOutputFormat extend
 
     public void publishTest(Job job) throws Exception {
         OutputCommitter committer = new FileOutputCommitterContainer(job, null);
-        committer.cleanupJob(job);
+        committer.commitJob(job);
 
         Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
         assertNotNull(part);

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java?rev=1393259&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
(added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitionPublish.java
Wed Oct  3 02:32:14 2012
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.NoExitSecurityManager;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatPartitionPublish {
+    private static Configuration mrConf = null;
+    private static FileSystem fs = null;
+    private static MiniMRCluster mrCluster = null;
+    private static boolean isServerRunning = false;
+    private static final int msPort = 20101;
+    private static HiveConf hcatConf;
+    private static HiveMetaStoreClient msc;
+    private static SecurityManager securityManager;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        Configuration conf = new Configuration(true);
+        fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(fs.getWorkingDirectory()
+                .toString(), "/logs").getAbsolutePath());
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+                new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+
+        if (isServerRunning) {
+            return;
+        }
+
+        MetaStoreUtils.startMetaStore(msPort, ShimLoader
+                .getHadoopThriftAuthBridge());
+        isServerRunning = true;
+        securityManager = System.getSecurityManager();
+        System.setSecurityManager(new NoExitSecurityManager());
+
+        hcatConf = new HiveConf(TestHCatPartitionPublish.class);
+        hcatConf.set("hive.metastore.local", "false");
+        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+                + msPort);
+        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+                "false");
+        msc = new HiveMetaStoreClient(hcatConf, null);
+        System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+        System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+        System.setSecurityManager(securityManager);
+        isServerRunning = false;
+    }
+
+    @Test
+    public void testPartitionPublish() throws Exception {
+        String dbName = "default";
+        String tableName = "testHCatPartitionedTable";
+        createTable(null, tableName);
+
+        Map<String, String> partitionMap = new HashMap<String, String>();
+        partitionMap.put("part1", "p1value1");
+        partitionMap.put("part0", "p0value1");
+
+        ArrayList<HCatFieldSchema> hcatTableColumns = new ArrayList<HCatFieldSchema>();
+        for (FieldSchema fs : getTableColumns()) {
+            hcatTableColumns.add(HCatSchemaUtils.getHCatFieldSchema(fs));
+        }
+
+        runMRCreateFail(dbName, tableName, partitionMap, hcatTableColumns);
+        List<String> ptns = msc.listPartitionNames(dbName, tableName,
+                (short) 10);
+        Assert.assertEquals(0, ptns.size());
+        Table table = msc.getTable(dbName, tableName);
+        Assert.assertTrue(table != null);
+        // Also make sure that the directory has been deleted in the table
+        // location.
+        Assert.assertFalse(fs.exists(new Path(table.getSd().getLocation()
+                + "/part1=p1value1/part0=p0value1")));
+    }
+
+    void runMRCreateFail(
+        String dbName, String tableName, Map<String, String> partitionValues,
+        List<HCatFieldSchema> columns) throws Exception {
+
+        Job job = new Job(mrConf, "hcat mapreduce write fail test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(TestHCatPartitionPublish.MapFail.class);
+
+        // input/output settings
+        job.setInputFormatClass(TextInputFormat.class);
+
+        Path path = new Path(fs.getWorkingDirectory(),
+                "mapred/testHCatMapReduceInput");
+        // The write count does not matter, as the map will fail in its first
+        // call.
+        createInputFile(path, 5);
+
+        TextInputFormat.setInputPaths(job, path);
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
+                partitionValues);
+        HCatOutputFormat.setOutput(job, outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(DefaultHCatRecord.class);
+
+        job.setNumReduceTasks(0);
+
+        HCatOutputFormat.setSchema(job, new HCatSchema(columns));
+
+        boolean success = job.waitForCompletion(true);
+        Assert.assertTrue(success == false);
+    }
+
+    private void createInputFile(Path path, int rowCount) throws IOException {
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+        FSDataOutputStream os = fs.create(path);
+        for (int i = 0; i < rowCount; i++) {
+            os.writeChars(i + "\n");
+        }
+        os.close();
+    }
+
+    public static class MapFail extends
+            Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context)
+            throws IOException, InterruptedException {
+            {
+                throw new IOException("Exception to mimic job failure.");
+            }
+        }
+    }
+
+    private void createTable(String dbName, String tableName) throws Exception {
+        String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME
+                : dbName;
+        try {
+            msc.dropTable(databaseName, tableName);
+        } catch (Exception e) {
+        } // can fail with NoSuchObjectException
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType("MANAGED_TABLE");
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(getPartitionKeys());
+        tbl.setSd(sd);
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(
+                org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT,
+                "1");
+        sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
+        sd.setInputFormat(RCFileInputFormat.class.getName());
+        sd.setOutputFormat(RCFileOutputFormat.class.getName());
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tbl.setParameters(tableParams);
+
+        msc.createTable(tbl);
+    }
+
+    protected List<FieldSchema> getPartitionKeys() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        // Defining partition names in unsorted order
+        fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+}



Mime
View raw message