incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tra...@apache.org
Subject svn commit: r1387314 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/
Date Tue, 18 Sep 2012 18:48:23 GMT
Author: travis
Date: Tue Sep 18 18:48:23 2012
New Revision: 1387314

URL: http://svn.apache.org/viewvc?rev=1387314&view=rev
Log:
HCATALOG-490 HCatStorer() throws error when the same partition key is present in records in
more than one tasks running as part of the same job

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Sep 18 18:48:23 2012
@@ -109,6 +109,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-490 HCatStorer() throws error when the same partition key is present in records in
more than one tasks running as part of the same job (amalakar via traviscrawford)
+
   HCAT-499 Multiple store commands does not work with Hadoop23 (rohinip via avandana)
 
   HCAT-501 HBase storage handler tests failing in trunk (traviscrawford)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
Tue Sep 18 18:48:23 2012
@@ -195,8 +195,6 @@ class FileRecordWriterContainer extends 
                 //create base OutputFormat
                 org.apache.hadoop.mapred.OutputFormat baseOF =
                     ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
-                //check outputSpecs
-                baseOF.checkOutputSpecs(null, currTaskContext.getJobConf());
                 //get Output Committer
                 org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter();
                 //create currJobContext the latest so it gets all the config changes

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java Tue
Sep 18 18:48:23 2012
@@ -72,10 +72,10 @@ public class HCatBaseTest {
      */
     protected void setUpHiveConf() {
         hiveConf = new HiveConf(this.getClass());
-        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
+        hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+        hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+        hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
     }
 
     protected void logAndRegister(PigServer server, String query) throws IOException {

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Tue Sep 18 18:48:23 2012
@@ -25,25 +25,20 @@ import java.util.List;
 import java.util.Map;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -55,22 +50,29 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads
  * it back using HCatInputFormat, checks the column values and counts.
  */
-public abstract class HCatMapReduceTest extends TestCase {
+public abstract class HCatMapReduceTest extends HCatBaseTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class);
-    protected String dbName = "default";
-    protected String tableName = "testHCatMapReduceTable";
+    protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+    protected static String tableName = "testHCatMapReduceTable";
 
     protected String inputFormat = RCFileInputFormat.class.getName();
     protected String outputFormat = RCFileOutputFormat.class.getName();
@@ -79,52 +81,30 @@ public abstract class HCatMapReduceTest 
     private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
     private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
 
-    protected abstract void initialize() throws Exception;
-
     protected abstract List<FieldSchema> getPartitionKeys();
 
     protected abstract List<FieldSchema> getTableColumns();
 
-    private HiveMetaStoreClient client;
-    protected HiveConf hiveConf;
-
-    private FileSystem fs;
-    private String thriftUri = null;
-
-    protected Driver driver;
-
-    @Override
-    protected void setUp() throws Exception {
-        hiveConf = new HiveConf(this.getClass());
-
-        //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
-        //is present only in the ql/test directory
-        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
-        driver = new Driver(hiveConf);
-        SessionState.start(new CliSessionState(hiveConf));
-
-        thriftUri = System.getenv("HCAT_METASTORE_URI");
-
-        if (thriftUri != null) {
-            LOG.info("Using URI {}", thriftUri);
-
-            hiveConf.set("hive.metastore.local", "false");
-            hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
-        }
+    private static FileSystem fs;
 
+    @BeforeClass
+    public static void setUpOneTime() throws Exception {
         fs = new LocalFileSystem();
         fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
 
-        initialize();
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0);
+        // Hack to initialize cache with 0 expiry time causing it to return a new hive client
every time
+        // Otherwise the cache doesn't play well with the second test method with the client
gets closed() in the
+        // tearDown() of the previous test
+        HCatUtil.getHiveClient(hiveConf);
 
-        client = new HiveMetaStoreClient(hiveConf, null);
-        initTable();
+        MapCreate.writeCount = 0;
+        MapRead.readCount = 0;
     }
 
-    @Override
-    protected void tearDown() throws Exception {
+    @After
+    public void deleteTable() throws Exception {
         try {
             String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME
: dbName;
 
@@ -133,13 +113,10 @@ public abstract class HCatMapReduceTest 
             e.printStackTrace();
             throw e;
         }
-
-        client.close();
     }
 
-
-    private void initTable() throws Exception {
-
+    @Before
+    public void createTable() throws Exception {
         String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
 
         try {
@@ -237,6 +214,23 @@ public abstract class HCatMapReduceTest 
     Job runMRCreate(Map<String, String> partitionValues,
                     List<HCatFieldSchema> partitionColumns, List<HCatRecord>
records,
                     int writeCount, boolean assertWrite) throws Exception {
+        return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite,
true);
+    }
+
+    /**
+     * Run a local map reduce job to load data from in memory records to an HCatalog Table
+     * @param partitionValues
+     * @param partitionColumns
+     * @param records data to be written to HCatalog table
+     * @param writeCount
+     * @param assertWrite
+     * @param asSingleMapTask
+     * @return
+     * @throws Exception
+     */
+    Job runMRCreate(Map<String, String> partitionValues,
+                    List<HCatFieldSchema> partitionColumns, List<HCatRecord>
records,
+                    int writeCount, boolean assertWrite, boolean asSingleMapTask) throws
Exception {
 
         writeRecords = records;
         MapCreate.writeCount = 0;
@@ -249,10 +243,22 @@ public abstract class HCatMapReduceTest 
         // input/output settings
         job.setInputFormatClass(TextInputFormat.class);
 
-        Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
-        createInputFile(path, writeCount);
+        if (asSingleMapTask) {
+            // One input path would mean only one map task
+            Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+            createInputFile(path, writeCount);
+            TextInputFormat.setInputPaths(job, path);
+        } else {
+            // Create two input paths so that two map tasks get triggered. There could be
other ways
+            // to trigger two map tasks.
+            Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
+            createInputFile(path, writeCount / 2);
 
-        TextInputFormat.setInputPaths(job, path);
+            Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2");
+            createInputFile(path2, (writeCount - writeCount / 2));
+
+            TextInputFormat.setInputPaths(job, path, path2);
+        }
 
         job.setOutputFormatClass(HCatOutputFormat.class);
 
@@ -294,6 +300,13 @@ public abstract class HCatMapReduceTest 
         return runMRRead(readCount, null);
     }
 
+    /**
+     * Run a local map reduce job to read records from HCatalog table and verify if the count
is as expected
+     * @param readCount
+     * @param filter
+     * @return
+     * @throws Exception
+     */
     List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
 
         MapRead.readCount = 0;

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -34,31 +34,37 @@ import org.apache.hcatalog.data.DefaultH
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
 public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
 
-    private List<HCatRecord> writeRecords;
-    private List<HCatFieldSchema> dataColumns;
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> dataColumns;
     private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
+    private static final int NUM_RECORDS = 20;
+    private static final int NUM_PARTITIONS = 5;
 
-    @Override
-    protected void initialize() throws Exception {
-
+    @BeforeClass
+    public static void generateInputData() throws Exception {
         tableName = "testHCatDynamicPartitionedTable";
-        generateWriteRecords(20, 5, 0);
+        generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
         generateDataColumns();
     }
 
-    private void generateDataColumns() throws HCatException {
+    private static void generateDataColumns() throws HCatException {
         dataColumns = new ArrayList<HCatFieldSchema>();
         dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME,
"")));
         dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME,
"")));
         dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME,
"")));
     }
 
-    private void generateWriteRecords(int max, int mod, int offset) {
+    private static void generateWriteRecords(int max, int mod, int offset) {
         writeRecords = new ArrayList<HCatRecord>();
 
         for (int i = 0; i < max; i++) {
@@ -86,13 +92,29 @@ public class TestHCatDynamicPartitioned 
         return fields;
     }
 
-
+    /**
+     * Run the dynamic partitioning test but with single map task
+     * @throws Exception
+     */
+    @Test
     public void testHCatDynamicPartitionedTable() throws Exception {
+        runHCatDynamicPartitionedTable(true);
+    }
+
+    /**
+     * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490
+     * @throws Exception
+     */
+    @Test
+    public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
+        runHCatDynamicPartitionedTable(false);
+    }
 
-        generateWriteRecords(20, 5, 0);
-        runMRCreate(null, dataColumns, writeRecords, 20, true);
+    protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception
{
+        generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+        runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask);
 
-        runMRRead(20);
+        runMRRead(NUM_RECORDS);
 
         //Read with partition filter
         runMRRead(4, "p1 = \"0\"");
@@ -110,14 +132,14 @@ public class TestHCatDynamicPartitioned 
 
         ArrayList<String> res = new ArrayList<String>();
         driver.getResults(res);
-        assertEquals(20, res.size());
+        assertEquals(NUM_RECORDS, res.size());
 
 
         //Test for duplicate publish
         IOException exc = null;
         try {
-            generateWriteRecords(20, 5, 0);
-            Job job = runMRCreate(null, dataColumns, writeRecords, 20, false);
+            generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+            Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false);
             if (HcatTestUtils.isHadoop23()) {
                 new FileOutputCommitterContainer(job, null).cleanupJob(job);
             }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -32,14 +32,19 @@ import org.apache.hcatalog.data.DefaultH
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestHCatNonPartitioned extends HCatMapReduceTest {
 
-    private List<HCatRecord> writeRecords;
-    List<HCatFieldSchema> partitionColumns;
+    private static List<HCatRecord> writeRecords;
+    static List<HCatFieldSchema> partitionColumns;
 
-    @Override
-    protected void initialize() throws HCatException {
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
 
         dbName = null; //test if null dbName works ("default" is used)
         tableName = "testHCatNonPartitionedTable";
@@ -75,6 +80,7 @@ public class TestHCatNonPartitioned exte
     }
 
 
+    @Test
     public void testHCatNonPartitionedTable() throws Exception {
 
         Map<String, String> partitionMap = new HashMap<String, String>();

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
(original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -33,14 +33,19 @@ import org.apache.hcatalog.data.HCatReco
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestHCatPartitioned extends HCatMapReduceTest {
 
-    private List<HCatRecord> writeRecords;
-    private List<HCatFieldSchema> partitionColumns;
+    private static List<HCatRecord> writeRecords;
+    private static List<HCatFieldSchema> partitionColumns;
 
-    @Override
-    protected void initialize() throws Exception {
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
 
         tableName = "testHCatPartitionedTable";
         writeRecords = new ArrayList<HCatRecord>();
@@ -77,6 +82,7 @@ public class TestHCatPartitioned extends
     }
 
 
+    @Test
     public void testHCatPartitionedTable() throws Exception {
 
         Map<String, String> partitionMap = new HashMap<String, String>();



Mime
View raw message