From hcatalog-commits-return-1074-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Fri Sep 21 18:37:53 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 503B1D2D1 for ; Fri, 21 Sep 2012 18:37:53 +0000 (UTC) Received: (qmail 13685 invoked by uid 500); 21 Sep 2012 18:37:53 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 13638 invoked by uid 500); 21 Sep 2012 18:37:52 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 13629 invoked by uid 99); 21 Sep 2012 18:37:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Sep 2012 18:37:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Sep 2012 18:37:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DF29E238890B; Fri, 21 Sep 2012 18:37:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1388607 - in /incubator/hcatalog/branches/branch-0.4: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/ Date: Fri, 21 Sep 2012 18:37:05 -0000 To: hcatalog-commits@incubator.apache.org From: toffer@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120921183705.DF29E238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: toffer Date: Fri Sep 21 18:37:05 2012 New Revision: 1388607 URL: http://svn.apache.org/viewvc?rev=1388607&view=rev Log: merged from trunk: HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer) Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original) +++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Fri Sep 21 18:37:05 2012 @@ -63,6 +63,8 @@ Trunk (unreleased changes) OPTIMIZATIONS BUG FIXES + HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer) + HCAT-494 MultiOutputFormat in 0.23 fails to setAliasConf() correctly. (mithun via toffer) HCAT-507 e2e harness failing on 0.23 (toffer) Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Fri Sep 21 18:37:05 2012 @@ -82,17 +82,17 @@ class FileRecordWriterContainer extends */ public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter, ? super Writable> baseWriter, TaskAttemptContext context) throws IOException, InterruptedException { - super(context,baseWriter); + super(context, baseWriter); this.context = context; jobInfo = HCatOutputFormat.getJobInfo(context); storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); - serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration()); + serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo); } catch (SerDeException e) { - throw new IOException("Failed to inialize SerDe",e); + throw new IOException("Failed to inialize SerDe", e); } // If partition columns occur in data, we want to remove them. @@ -101,9 +101,9 @@ class FileRecordWriterContainer extends dynamicPartCols = jobInfo.getPosOfDynPartCols(); maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); - if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){ + if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) { throw new HCatException("It seems that setSchema() is not called on " + - "HCatOutputFormat. Please make sure that method is called."); + "HCatOutputFormat. Please make sure that method is called."); } @@ -114,10 +114,9 @@ class FileRecordWriterContainer extends this.dynamicContexts = null; this.dynamicObjectInspectors = null; this.dynamicOutputJobInfo = null; - } - else { - this.baseDynamicSerDe = new HashMap(); - this.baseDynamicWriters = new HashMap, ? super Writable>>(); + } else { + this.baseDynamicSerDe = new HashMap(); + this.baseDynamicWriters = new HashMap, ? super Writable>>(); this.baseDynamicCommitters = new HashMap(); this.dynamicContexts = new HashMap(); this.dynamicObjectInspectors = new HashMap(); @@ -134,17 +133,17 @@ class FileRecordWriterContainer extends @Override public void close(TaskAttemptContext context) throws IOException, - InterruptedException { + InterruptedException { Reporter reporter = InternalUtil.createReporter(context); - if (dynamicPartitioningUsed){ - for (org.apache.hadoop.mapred.RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()){ + if (dynamicPartitioningUsed) { + for (org.apache.hadoop.mapred.RecordWriter, ? super Writable> bwriter : baseDynamicWriters.values()) { //We are in RecordWriter.close() make sense that the context would be TaskInputOutput bwriter.close(reporter); } - for(Map.Entryentry : baseDynamicCommitters.entrySet()) { + for (Map.Entry entry : baseDynamicCommitters.entrySet()) { org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)){ + if (baseOutputCommitter.needsTaskCommit(currContext)) { baseOutputCommitter.commitTask(currContext); } org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currContext); @@ -157,92 +156,96 @@ class FileRecordWriterContainer extends @Override public void write(WritableComparable key, HCatRecord value) throws IOException, - InterruptedException { + InterruptedException { org.apache.hadoop.mapred.RecordWriter localWriter; ObjectInspector localObjectInspector; SerDe localSerDe; OutputJobInfo localJobInfo = null; - if (dynamicPartitioningUsed){ + if (dynamicPartitioningUsed) { // calculate which writer to use from the remaining values - this needs to be done before we delete cols List dynamicPartValues = new ArrayList(); - for (Integer colToAppend : dynamicPartCols){ + for (Integer colToAppend : dynamicPartCols) { dynamicPartValues.add(value.get(colToAppend).toString()); } String dynKey = dynamicPartValues.toString(); - if (!baseDynamicWriters.containsKey(dynKey)){ - if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){ + if (!baseDynamicWriters.containsKey(dynKey)) { + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" - + maxDynamicPartitions - + "], increase parameter [" - + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + + maxDynamicPartitions + + "], increase parameter [" + + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); } org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext); + localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext); //setup serDe SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); try { InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo); } catch (SerDeException e) { - throw new IOException("Failed to initialize SerDe",e); + throw new IOException("Failed to initialize SerDe", e); } //create base OutputFormat org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); - //check outputSpecs - baseOF.checkOutputSpecs(null,currTaskContext.getJobConf()); + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); //get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); //create currJobContext the latest so it gets all the config changes org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); + + //We are skipping calling checkOutputSpecs() for each partition + //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition + //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance + //In general this should be ok for most FileOutputFormat implementations + //but may become an issue for cases when the method is used to perform other setup tasks + //setupJob() baseOutputCommitter.setupJob(currJobContext); //recreate to refresh jobConf of currTask context currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), - currTaskContext.getProgressible()); + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), + currTaskContext.getProgressible()); //set temp location currTaskContext.getConfiguration().set("mapred.work.output.dir", - new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString()); + new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString()); //setupTask() baseOutputCommitter.setupTask(currTaskContext); org.apache.hadoop.mapred.RecordWriter baseRecordWriter = - baseOF.getRecordWriter(null, - currTaskContext.getJobConf(), - FileOutputFormat.getUniqueFile(currTaskContext, "part", ""), - InternalUtil.createReporter(currTaskContext)); + baseOF.getRecordWriter(null, + currTaskContext.getJobConf(), + FileOutputFormat.getUniqueFile(currTaskContext, "part", ""), + InternalUtil.createReporter(currTaskContext)); baseDynamicWriters.put(dynKey, baseRecordWriter); - baseDynamicSerDe.put(dynKey,currSerDe); - baseDynamicCommitters.put(dynKey,baseOutputCommitter); - dynamicContexts.put(dynKey,currTaskContext); - dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + baseDynamicSerDe.put(dynKey, currSerDe); + baseDynamicCommitters.put(dynKey, baseOutputCommitter); + dynamicContexts.put(dynKey, currTaskContext); + dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey))); } - + localJobInfo = dynamicOutputJobInfo.get(dynKey); localWriter = baseDynamicWriters.get(dynKey); localSerDe = baseDynamicSerDe.get(dynKey); localObjectInspector = dynamicObjectInspectors.get(dynKey); - } - else{ + } else { localJobInfo = jobInfo; localWriter = getBaseRecordWriter(); localSerDe = serDe; localObjectInspector = objectInspector; } - for(Integer colToDel : partColsToDel){ + for (Integer colToDel : partColsToDel) { value.remove(colToDel); } @@ -251,7 +254,7 @@ class FileRecordWriterContainer extends try { localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { - throw new IOException("Failed to serialize object",e); + throw new IOException("Failed to serialize object", e); } } Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Sep 21 18:37:05 2012 @@ -25,328 +25,342 @@ import java.util.List; import java.util.Map; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hcatalog.HcatTestUtils; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertTrue; + /** * Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and reads * it back using HCatInputFormat, checks the column values and counts. */ -public abstract class HCatMapReduceTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); - protected String dbName = "default"; - protected String tableName = "testHCatMapReduceTable"; - - protected String inputFormat = RCFileInputFormat.class.getName(); - protected String outputFormat = RCFileOutputFormat.class.getName(); - protected String serdeClass = ColumnarSerDe.class.getName(); - - private static List writeRecords = new ArrayList(); - private static List readRecords = new ArrayList(); - - protected abstract void initialize() throws Exception; - - protected abstract List getPartitionKeys(); - - protected abstract List getTableColumns(); +public abstract class HCatMapReduceTest extends HCatBaseTest { - private HiveMetaStoreClient client; - protected HiveConf hiveConf; + private static final Logger LOG = LoggerFactory.getLogger(HCatMapReduceTest.class); + protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME; + protected static String tableName = "testHCatMapReduceTable"; - private FileSystem fs; - private String thriftUri = null; + protected String inputFormat = RCFileInputFormat.class.getName(); + protected String outputFormat = RCFileOutputFormat.class.getName(); + protected String serdeClass = ColumnarSerDe.class.getName(); - protected Driver driver; + private static List writeRecords = new ArrayList(); + private static List readRecords = new ArrayList(); - @Override - protected void setUp() throws Exception { - hiveConf = new HiveConf(this.getClass()); + protected abstract List getPartitionKeys(); - //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook - //is present only in the ql/test directory - hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - driver = new Driver(hiveConf); - SessionState.start(new CliSessionState(hiveConf)); + protected abstract List getTableColumns(); - thriftUri = System.getenv("HCAT_METASTORE_URI"); + private static FileSystem fs; - if( thriftUri != null ) { - LOG.info("Using URI {}", thriftUri); - - hiveConf.set("hive.metastore.local", "false"); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri); + @BeforeClass + public static void setUpOneTime() throws Exception { + fs = new LocalFileSystem(); + fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); + MapCreate.writeCount = 0; + MapRead.readCount = 0; } - fs = new LocalFileSystem(); - fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); - - initialize(); - - client = new HiveMetaStoreClient(hiveConf, null); - initTable(); - } - - @Override - protected void tearDown() throws Exception { - try { - String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; + @After + public void deleteTable() throws Exception { + try { + String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; - client.dropTable(databaseName, tableName); - } catch(Exception e) { - e.printStackTrace(); - throw e; + client.dropTable(databaseName, tableName); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } } - client.close(); - } - - - - private void initTable() throws Exception { - - String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; - - try { - client.dropTable(databaseName, tableName); - } catch(Exception e) { - } //can fail with NoSuchObjectException - - - Table tbl = new Table(); - tbl.setDbName(databaseName); - tbl.setTableName(tableName); - tbl.setTableType("MANAGED_TABLE"); - StorageDescriptor sd = new StorageDescriptor(); - - sd.setCols(getTableColumns()); - tbl.setPartitionKeys(getPartitionKeys()); - - tbl.setSd(sd); - - sd.setBucketCols(new ArrayList(2)); - sd.setSerdeInfo(new SerDeInfo()); - sd.getSerdeInfo().setName(tbl.getTableName()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSerdeInfo().getParameters().put( - org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); - sd.getSerdeInfo().setSerializationLib(serdeClass); - sd.setInputFormat(inputFormat); - sd.setOutputFormat(outputFormat); + @Before + public void disableHiveClientCache() throws IOException, MetaException { + HiveConf hiveConf = new HiveConf(); + hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0); + // Hack to initialize cache with 0 expiry time causing it to return a new hive client every time + // Otherwise the cache doesn't play well with the second test method with the client gets closed() in the + // tearDown() of the previous test + HCatUtil.getHiveClient(hiveConf); + } - Map tableParams = new HashMap(); - tbl.setParameters(tableParams); + @Before + public void createTable() throws Exception { + String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName; - client.createTable(tbl); - } + try { + client.dropTable(databaseName, tableName); + } catch (Exception e) { + } //can fail with NoSuchObjectException + + + Table tbl = new Table(); + tbl.setDbName(databaseName); + tbl.setTableName(tableName); + tbl.setTableType("MANAGED_TABLE"); + StorageDescriptor sd = new StorageDescriptor(); + + sd.setCols(getTableColumns()); + tbl.setPartitionKeys(getPartitionKeys()); + + tbl.setSd(sd); + + sd.setBucketCols(new ArrayList(2)); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setName(tbl.getTableName()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().getParameters().put( + org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1"); + sd.getSerdeInfo().setSerializationLib(serdeClass); + sd.setInputFormat(inputFormat); + sd.setOutputFormat(outputFormat); - //Create test input file with specified number of rows - private void createInputFile(Path path, int rowCount) throws IOException { + Map tableParams = new HashMap(); + tbl.setParameters(tableParams); - if( fs.exists(path) ) { - fs.delete(path, true); + client.createTable(tbl); } - FSDataOutputStream os = fs.create(path); + //Create test input file with specified number of rows + private void createInputFile(Path path, int rowCount) throws IOException { - for(int i = 0;i < rowCount;i++) { - os.writeChars(i + "\n"); - } + if (fs.exists(path)) { + fs.delete(path, true); + } - os.close(); - } + FSDataOutputStream os = fs.create(path); - public static class MapCreate extends - Mapper { + for (int i = 0; i < rowCount; i++) { + os.writeChars(i + "\n"); + } - static int writeCount = 0; //test will be in local mode + os.close(); + } - @Override - public void map(LongWritable key, Text value, Context context - ) throws IOException, InterruptedException { - { - try { - HCatRecord rec = writeRecords.get(writeCount); - context.write(null, rec); - writeCount++; + public static class MapCreate extends + Mapper { - }catch(Exception e) { + static int writeCount = 0; //test will be in local mode - e.printStackTrace(System.err); //print since otherwise exception is lost - throw new IOException(e); + @Override + public void map(LongWritable key, Text value, Context context + ) throws IOException, InterruptedException { + { + try { + HCatRecord rec = writeRecords.get(writeCount); + context.write(null, rec); + writeCount++; + + } catch (Exception e) { + + e.printStackTrace(System.err); //print since otherwise exception is lost + throw new IOException(e); + } + } } - } } - } - public static class MapRead extends - Mapper { + public static class MapRead extends + Mapper { - static int readCount = 0; //test will be in local mode + static int readCount = 0; //test will be in local mode - @Override - public void map(WritableComparable key, HCatRecord value, Context context - ) throws IOException, InterruptedException { - { - try { - readRecords.add(value); - readCount++; - } catch(Exception e) { - e.printStackTrace(); //print since otherwise exception is lost - throw new IOException(e); + @Override + public void map(WritableComparable key, HCatRecord value, Context context + ) throws IOException, InterruptedException { + { + try { + readRecords.add(value); + readCount++; + } catch (Exception e) { + e.printStackTrace(); //print since otherwise exception is lost + throw new IOException(e); + } + } } - } } - } - Job runMRCreate(Map partitionValues, - List partitionColumns, List records, - int writeCount, boolean assertWrite) throws Exception { + Job runMRCreate(Map partitionValues, + List partitionColumns, List records, + int writeCount, boolean assertWrite) throws Exception { + return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite, true); + } - writeRecords = records; - MapCreate.writeCount = 0; + /** + * Run a local map reduce job to load data from in memory records to an HCatalog Table + * @param partitionValues + * @param partitionColumns + * @param records data to be written to HCatalog table + * @param writeCount + * @param assertWrite + * @param asSingleMapTask + * @return + * @throws Exception + */ + Job runMRCreate(Map partitionValues, + List partitionColumns, List records, + int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception { + + writeRecords = records; + MapCreate.writeCount = 0; + + Configuration conf = new Configuration(); + Job job = new Job(conf, "hcat mapreduce write test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(HCatMapReduceTest.MapCreate.class); + + // input/output settings + job.setInputFormatClass(TextInputFormat.class); + + if (asSingleMapTask) { + // One input path would mean only one map task + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount); + TextInputFormat.setInputPaths(job, path); + } else { + // Create two input paths so that two map tasks get triggered. There could be other ways + // to trigger two map tasks. + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); + createInputFile(path, writeCount / 2); - Configuration conf = new Configuration(); - Job job = new Job(conf, "hcat mapreduce write test"); - job.setJarByClass(this.getClass()); - job.setMapperClass(HCatMapReduceTest.MapCreate.class); + Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2"); + createInputFile(path2, (writeCount - writeCount / 2)); - // input/output settings - job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, path, path2); + } - Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput"); - createInputFile(path, writeCount); + job.setOutputFormatClass(HCatOutputFormat.class); - TextInputFormat.setInputPaths(job, path); + OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); + HCatOutputFormat.setOutput(job, outputJobInfo); - job.setOutputFormatClass(HCatOutputFormat.class); + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(DefaultHCatRecord.class); - OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues); - HCatOutputFormat.setOutput(job, outputJobInfo); + job.setNumReduceTasks(0); - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(DefaultHCatRecord.class); + HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns)); - job.setNumReduceTasks(0); + boolean success = job.waitForCompletion(true); - HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns)); + // Ensure counters are set when data has actually been read. + if (partitionValues != null) { + assertTrue(job.getCounters().getGroup("FileSystemCounters") + .findCounter("FILE_BYTES_READ").getValue() > 0); + } - boolean success = job.waitForCompletion(true); + if (!HcatTestUtils.isHadoop23()) { + // Local mode outputcommitter hook is not invoked in Hadoop 1.x + if (success) { + new FileOutputCommitterContainer(job, null).commitJob(job); + } else { + new FileOutputCommitterContainer(job, null).abortJob(job, JobStatus.State.FAILED); + } + } + if (assertWrite) { + // we assert only if we expected to assert with this call. + Assert.assertEquals(writeCount, MapCreate.writeCount); + } - // Ensure counters are set when data has actually been read. - if (partitionValues != null) { - assertTrue(job.getCounters().getGroup("FileSystemCounters") - .findCounter("FILE_BYTES_READ").getValue() > 0); + return job; } - if (!HcatTestUtils.isHadoop23()) { - // Local mode outputcommitter hook is not invoked in Hadoop 1.x - if (success) { - new FileOutputCommitterContainer(job,null).commitJob(job); - } else { - new FileOutputCommitterContainer(job,null).abortJob(job, JobStatus.State.FAILED); - } + List runMRRead(int readCount) throws Exception { + return runMRRead(readCount, null); } - if (assertWrite){ - // we assert only if we expected to assert with this call. - Assert.assertEquals(writeCount, MapCreate.writeCount); - } - - return job; - } - - List runMRRead(int readCount) throws Exception { - return runMRRead(readCount, null); - } - - List runMRRead(int readCount, String filter) throws Exception { - - MapRead.readCount = 0; - readRecords.clear(); - - Configuration conf = new Configuration(); - Job job = new Job(conf, "hcat mapreduce read test"); - job.setJarByClass(this.getClass()); - job.setMapperClass(HCatMapReduceTest.MapRead.class); - // input/output settings - job.setInputFormatClass(HCatInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - - InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,filter); - HCatInputFormat.setInput(job, inputJobInfo); + /** + * Run a local map reduce job to read records from HCatalog table and verify if the count is as expected + * @param readCount + * @param filter + * @return + * @throws Exception + */ + List runMRRead(int readCount, String filter) throws Exception { + + MapRead.readCount = 0; + readRecords.clear(); + + Configuration conf = new Configuration(); + Job job = new Job(conf, "hcat mapreduce read test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(HCatMapReduceTest.MapRead.class); + + // input/output settings + job.setInputFormatClass(HCatInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, filter); + HCatInputFormat.setInput(job, inputJobInfo); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + + job.setNumReduceTasks(0); + + Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput"); + if (fs.exists(path)) { + fs.delete(path, true); + } - job.setMapOutputKeyClass(BytesWritable.class); - job.setMapOutputValueClass(Text.class); + TextOutputFormat.setOutputPath(job, path); - job.setNumReduceTasks(0); + job.waitForCompletion(true); + Assert.assertEquals(readCount, MapRead.readCount); - Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceOutput"); - if( fs.exists(path) ) { - fs.delete(path, true); + return readRecords; } - TextOutputFormat.setOutputPath(job, path); - - job.waitForCompletion(true); - Assert.assertEquals(readCount, MapRead.readCount); - return readRecords; - } + protected HCatSchema getTableSchema() throws Exception { + Configuration conf = new Configuration(); + Job job = new Job(conf, "hcat mapreduce read schema test"); + job.setJarByClass(this.getClass()); - protected HCatSchema getTableSchema() throws Exception { + // input/output settings + job.setInputFormatClass(HCatInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); - Configuration conf = new Configuration(); - Job job = new Job(conf, "hcat mapreduce read schema test"); - job.setJarByClass(this.getClass()); + InputJobInfo inputJobInfo = InputJobInfo.create(dbName, tableName, null); + HCatInputFormat.setInput(job, inputJobInfo); - // input/output settings - job.setInputFormatClass(HCatInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - - InputJobInfo inputJobInfo = InputJobInfo.create(dbName,tableName,null); - HCatInputFormat.setInput(job, inputJobInfo); - - return HCatInputFormat.getTableSchema(job); - } + return HCatInputFormat.getTableSchema(job); + } } Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Sep 21 18:37:05 2012 @@ -34,129 +34,151 @@ import org.apache.hcatalog.data.DefaultH import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + public class TestHCatDynamicPartitioned extends HCatMapReduceTest { - private List writeRecords; - private List dataColumns; - private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); - - @Override - protected void initialize() throws Exception { - - tableName = "testHCatDynamicPartitionedTable"; - generateWriteRecords(20,5,0); - generateDataColumns(); - } - - private void generateDataColumns() throws HCatException { - dataColumns = new ArrayList(); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""))); - } - - private void generateWriteRecords(int max, int mod,int offset) { - writeRecords = new ArrayList(); - - for(int i = 0;i < max;i++) { - List objList = new ArrayList(); - - objList.add(i); - objList.add("strvalue" + i); - objList.add(String.valueOf((i % mod)+offset)); - writeRecords.add(new DefaultHCatRecord(objList)); - } - } - - @Override - protected List getPartitionKeys() { - List fields = new ArrayList(); - fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - @Override - protected List getTableColumns() { - List fields = new ArrayList(); - fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); - fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - - public void testHCatDynamicPartitionedTable() throws Exception { - - generateWriteRecords(20,5,0); - runMRCreate(null, dataColumns, writeRecords, 20,true); - - runMRRead(20); - - //Read with partition filter - runMRRead(4, "p1 = \"0\""); - runMRRead(8, "p1 = \"1\" or p1 = \"3\""); - runMRRead(4, "p1 = \"4\""); - - // read from hive to test - - String query = "select * from " + tableName; - int retCode = driver.run(query).getResponseCode(); - - if( retCode != 0 ) { - throw new Exception("Error " + retCode + " running query " + query); - } - - ArrayList res = new ArrayList(); - driver.getResults(res); - assertEquals(20, res.size()); - - - //Test for duplicate publish - IOException exc = null; - try { - generateWriteRecords(20,5,0); - Job job = runMRCreate(null, dataColumns, writeRecords, 20,false); - if (HcatTestUtils.isHadoop23()) { - new FileOutputCommitterContainer(job,null).cleanupJob(job); - } - } catch(IOException e) { - exc = e; - } - - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertTrue( "Got exception of type ["+((HCatException) exc).getErrorType().toString() - + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED", - (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) - || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) + private static List writeRecords; + private static List dataColumns; + private static final Logger LOG = LoggerFactory.getLogger(TestHCatDynamicPartitioned.class); + private static final int NUM_RECORDS = 20; + private static final int NUM_PARTITIONS = 5; + + @BeforeClass + public static void generateInputData() throws Exception { + tableName = "testHCatDynamicPartitionedTable"; + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + generateDataColumns(); + } + + private static void generateDataColumns() throws HCatException { + dataColumns = new ArrayList(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""))); + } + + private static void generateWriteRecords(int max, int mod, int offset) { + writeRecords = new ArrayList(); + + for (int i = 0; i < max; i++) { + List objList = new ArrayList(); + + objList.add(i); + objList.add("strvalue" + i); + objList.add(String.valueOf((i % mod) + offset)); + writeRecords.add(new DefaultHCatRecord(objList)); + } + } + + @Override + protected List getPartitionKeys() { + List fields = new ArrayList(); + fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + @Override + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + /** + * Run the dynamic partitioning test but with single map task + * @throws Exception + */ + @Test + public void testHCatDynamicPartitionedTable() throws Exception { + runHCatDynamicPartitionedTable(true); + } + + /** + * Run the dynamic partitioning test but with multiple map task. See HCATALOG-490 + * @throws Exception + */ + @Test + public void testHCatDynamicPartitionedTableMultipleTask() throws Exception { + runHCatDynamicPartitionedTable(false); + } + + protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask); + + runMRRead(NUM_RECORDS); + + //Read with partition filter + runMRRead(4, "p1 = \"0\""); + runMRRead(8, "p1 = \"1\" or p1 = \"3\""); + runMRRead(4, "p1 = \"4\""); + + // read from hive to test + + String query = "select * from " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if (retCode != 0) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(NUM_RECORDS, res.size()); + + + //Test for duplicate publish + IOException exc = null; + try { + generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0); + Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false); + if (HcatTestUtils.isHadoop23()) { + new FileOutputCommitterContainer(job, null).cleanupJob(job); + } + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString() + + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED", + (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) + || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) ); - } + } -//TODO 1.0 miniCluster is slow this test times out, make it work + //TODO 1.0 miniCluster is slow this test times out, make it work // renaming test to make test framework skip it - public void _testHCatDynamicPartitionMaxPartitions() throws Exception { - HiveConf hc = new HiveConf(this.getClass()); + public void _testHCatDynamicPartitionMaxPartitions() throws Exception { + HiveConf hc = new HiveConf(this.getClass()); - int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); - LOG.info("Max partitions allowed = {}", maxParts); + int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + LOG.info("Max partitions allowed = {}", maxParts); - IOException exc = null; - try { - generateWriteRecords(maxParts+5,maxParts+2,10); - runMRCreate(null,dataColumns,writeRecords,maxParts+5,false); - } catch(IOException e) { - exc = e; - } - - if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); - }else{ - assertTrue(exc == null); - runMRRead(maxParts+5); + IOException exc = null; + try { + generateWriteRecords(maxParts + 5, maxParts + 2, 10); + runMRCreate(null, dataColumns, writeRecords, maxParts + 5, false); + } catch (IOException e) { + exc = e; + } + + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED) { + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); + } else { + assertTrue(exc == null); + runMRRead(maxParts + 5); + } } - } } Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Sep 21 18:37:05 2012 @@ -32,99 +32,105 @@ import org.apache.hcatalog.data.DefaultH import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestHCatNonPartitioned extends HCatMapReduceTest { - private List writeRecords; - List partitionColumns; + private static List writeRecords; + static List partitionColumns; + + @BeforeClass + public static void oneTimeSetUp() throws Exception { - @Override - protected void initialize() throws HCatException { + dbName = null; //test if null dbName works ("default" is used) + tableName = "testHCatNonPartitionedTable"; - dbName = null; //test if null dbName works ("default" is used) - tableName = "testHCatNonPartitionedTable"; + writeRecords = new ArrayList(); - writeRecords = new ArrayList(); + for (int i = 0; i < 20; i++) { + List objList = new ArrayList(); - for(int i = 0;i < 20;i++) { - List objList = new ArrayList(); + objList.add(i); + objList.add("strvalue" + i); + writeRecords.add(new DefaultHCatRecord(objList)); + } - objList.add(i); - objList.add("strvalue" + i); - writeRecords.add(new DefaultHCatRecord(objList)); + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); } - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - } - - @Override - protected List getPartitionKeys() { - List fields = new ArrayList(); - //empty list, non partitioned - return fields; - } - - @Override - protected List getTableColumns() { - List fields = new ArrayList(); - fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); - fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - - public void testHCatNonPartitionedTable() throws Exception { - - Map partitionMap = new HashMap(); - runMRCreate(null, partitionColumns, writeRecords, 10,true); - - //Test for duplicate publish - IOException exc = null; - try { - runMRCreate(null, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; + @Override + protected List getPartitionKeys() { + List fields = new ArrayList(); + //empty list, non partitioned + return fields; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); - - //Test for publish with invalid partition key name - exc = null; - partitionMap.clear(); - partitionMap.put("px", "p1value2"); - - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; + @Override + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); - //Read should get 10 rows - runMRRead(10); + @Test + public void testHCatNonPartitionedTable() throws Exception { - hiveReadTest(); - } + Map partitionMap = new HashMap(); + runMRCreate(null, partitionColumns, writeRecords, 10, true); - //Test that data inserted through hcatoutputformat is readable from hive - private void hiveReadTest() throws Exception { + //Test for duplicate publish + IOException exc = null; + try { + runMRCreate(null, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); + + //Test for publish with invalid partition key name + exc = null; + partitionMap.clear(); + partitionMap.put("px", "p1value2"); + + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); - String query = "select * from " + tableName; - int retCode = driver.run(query).getResponseCode(); + //Read should get 10 rows + runMRRead(10); - if( retCode != 0 ) { - throw new Exception("Error " + retCode + " running query " + query); + hiveReadTest(); } - ArrayList res = new ArrayList(); - driver.getResults(res); - assertEquals(10, res.size()); - } + //Test that data inserted through hcatoutputformat is readable from hive + private void hiveReadTest() throws Exception { + + String query = "select * from " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if (retCode != 0) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(10, res.size()); + } } Modified: incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1388607&r1=1388606&r2=1388607&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original) +++ incubator/hcatalog/branches/branch-0.4/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Sep 21 18:37:05 2012 @@ -33,312 +33,318 @@ import org.apache.hcatalog.data.HCatReco import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestHCatPartitioned extends HCatMapReduceTest { - private List writeRecords; - private List partitionColumns; + private static List writeRecords; + private static List partitionColumns; - @Override - protected void initialize() throws Exception { + @BeforeClass + public static void oneTimeSetUp() throws Exception { - tableName = "testHCatPartitionedTable"; - writeRecords = new ArrayList(); + tableName = "testHCatPartitionedTable"; + writeRecords = new ArrayList(); - for(int i = 0;i < 20;i++) { - List objList = new ArrayList(); + for (int i = 0; i < 20; i++) { + List objList = new ArrayList(); - objList.add(i); - objList.add("strvalue" + i); - writeRecords.add(new DefaultHCatRecord(objList)); - } + objList.add(i); + objList.add("strvalue" + i); + writeRecords.add(new DefaultHCatRecord(objList)); + } - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - } - - - @Override - protected List getPartitionKeys() { - List fields = new ArrayList(); - //Defining partition names in unsorted order - fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, "")); - fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - @Override - protected List getTableColumns() { - List fields = new ArrayList(); - fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); - fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - - public void testHCatPartitionedTable() throws Exception { - - Map partitionMap = new HashMap(); - partitionMap.put("part1", "p1value1"); - partitionMap.put("part0", "p0value1"); - - runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); - - partitionMap.clear(); - partitionMap.put("PART1", "p1value2"); - partitionMap.put("PART0", "p0value2"); - - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - - //Test for duplicate publish - IOException exc = null; - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType()); - - //Test for publish with invalid partition key name - exc = null; - partitionMap.clear(); - partitionMap.put("px1", "p1value2"); - partitionMap.put("px0", "p0value2"); - - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; + + @Override + protected List getPartitionKeys() { + List fields = new ArrayList(); + //Defining partition names in unsorted order + fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, "")); + fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, "")); + return fields; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); - - //Test for publish with missing partition key values - exc = null; - partitionMap.clear(); - partitionMap.put("px", "p1value2"); - - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; + @Override + protected List getTableColumns() { + List fields = new ArrayList(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); + @Test + public void testHCatPartitionedTable() throws Exception { - //Test for null partition value map - exc = null; - try { - runMRCreate(null, partitionColumns, writeRecords, 20,false); - } catch(IOException e) { - exc = e; - } + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value1"); + partitionMap.put("part0", "p0value1"); + + runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); + + partitionMap.clear(); + partitionMap.put("PART1", "p1value2"); + partitionMap.put("PART0", "p0value2"); + + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + + //Test for duplicate publish + IOException exc = null; + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType()); - assertTrue(exc == null); + //Test for publish with invalid partition key name + exc = null; + partitionMap.clear(); + partitionMap.put("px1", "p1value2"); + partitionMap.put("px0", "p0value2"); + + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); + + //Test for publish with missing partition key values + exc = null; + partitionMap.clear(); + partitionMap.put("px", "p1value2"); + + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); + + + //Test for null partition value map + exc = null; + try { + runMRCreate(null, partitionColumns, writeRecords, 20, false); + } catch (IOException e) { + exc = e; + } + + assertTrue(exc == null); // assertTrue(exc instanceof HCatException); // assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); - // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values + // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values - //Read should get 10 + 20 rows - runMRRead(30); + //Read should get 10 + 20 rows + runMRRead(30); - //Read with partition filter - runMRRead(10, "part1 = \"p1value1\""); - runMRRead(20, "part1 = \"p1value2\""); - runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); - runMRRead(10, "part0 = \"p0value1\""); - runMRRead(20, "part0 = \"p0value2\""); - runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\""); + //Read with partition filter + runMRRead(10, "part1 = \"p1value1\""); + runMRRead(20, "part1 = \"p1value2\""); + runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); + runMRRead(10, "part0 = \"p0value1\""); + runMRRead(20, "part0 = \"p0value2\""); + runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\""); - tableSchemaTest(); - columnOrderChangeTest(); - hiveReadTest(); - } + tableSchemaTest(); + columnOrderChangeTest(); + hiveReadTest(); + } - //test that new columns gets added to table schema - private void tableSchemaTest() throws Exception { + //test that new columns gets added to table schema + private void tableSchemaTest() throws Exception { - HCatSchema tableSchema = getTableSchema(); + HCatSchema tableSchema = getTableSchema(); - assertEquals(4, tableSchema.getFields().size()); + assertEquals(4, tableSchema.getFields().size()); - //Update partition schema to have 3 fields - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); + //Update partition schema to have 3 fields + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); - writeRecords = new ArrayList(); + writeRecords = new ArrayList(); - for(int i = 0;i < 20;i++) { - List objList = new ArrayList(); + for (int i = 0; i < 20; i++) { + List objList = new ArrayList(); - objList.add(i); - objList.add("strvalue" + i); - objList.add("str2value" + i); + objList.add(i); + objList.add("strvalue" + i); + objList.add("str2value" + i); - writeRecords.add(new DefaultHCatRecord(objList)); - } + writeRecords.add(new DefaultHCatRecord(objList)); + } - Map partitionMap = new HashMap(); - partitionMap.put("part1", "p1value5"); - partitionMap.put("part0", "p0value5"); - - runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); - - tableSchema = getTableSchema(); - - //assert that c3 has got added to table schema - assertEquals(5, tableSchema.getFields().size()); - assertEquals("c1", tableSchema.getFields().get(0).getName()); - assertEquals("c2", tableSchema.getFields().get(1).getName()); - assertEquals("c3", tableSchema.getFields().get(2).getName()); - assertEquals("part1", tableSchema.getFields().get(3).getName()); - assertEquals("part0", tableSchema.getFields().get(4).getName()); - - //Test that changing column data type fails - partitionMap.clear(); - partitionMap.put("part1", "p1value6"); - partitionMap.put("part0", "p0value6"); - - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, ""))); - - IOException exc = null; - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); - } catch(IOException e) { - exc = e; - } + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value5"); + partitionMap.put("part0", "p0value5"); - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType()); - - //Test that partition key is not allowed in data - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, ""))); - - List recordsContainingPartitionCols = new ArrayList(20); - for(int i = 0;i < 20;i++) { - List objList = new ArrayList(); - - objList.add(i); - objList.add("c2value" + i); - objList.add("c3value" + i); - objList.add("p1value6"); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); - recordsContainingPartitionCols.add(new DefaultHCatRecord(objList)); - } + tableSchema = getTableSchema(); - exc = null; - try { - runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true); - } catch(IOException e) { - exc = e; - } + //assert that c3 has got added to table schema + assertEquals(5, tableSchema.getFields().size()); + assertEquals("c1", tableSchema.getFields().get(0).getName()); + assertEquals("c2", tableSchema.getFields().get(1).getName()); + assertEquals("c3", tableSchema.getFields().get(2).getName()); + assertEquals("part1", tableSchema.getFields().get(3).getName()); + assertEquals("part0", tableSchema.getFields().get(4).getName()); - List records= runMRRead(20,"part1 = \"p1value6\""); - assertEquals(20, records.size()); - records= runMRRead(20,"part0 = \"p0value6\""); - assertEquals(20, records.size()); - Integer i =0; - for(HCatRecord rec : records){ - assertEquals(5, rec.size()); - assertTrue(rec.get(0).equals(i)); - assertTrue(rec.get(1).equals("c2value"+i)); - assertTrue(rec.get(2).equals("c3value"+i)); - assertTrue(rec.get(3).equals("p1value6")); - assertTrue(rec.get(4).equals("p0value6")); - i++; - } - } + //Test that changing column data type fails + partitionMap.clear(); + partitionMap.put("part1", "p1value6"); + partitionMap.put("part0", "p0value6"); - //check behavior while change the order of columns - private void columnOrderChangeTest() throws Exception { + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.INT_TYPE_NAME, ""))); - HCatSchema tableSchema = getTableSchema(); + IOException exc = null; + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + } catch (IOException e) { + exc = e; + } - assertEquals(5, tableSchema.getFields().size()); + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, ((HCatException) exc).getErrorType()); - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + //Test that partition key is not allowed in data + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("part1", Constants.STRING_TYPE_NAME, ""))); + List recordsContainingPartitionCols = new ArrayList(20); + for (int i = 0; i < 20; i++) { + List objList = new ArrayList(); - writeRecords = new ArrayList(); + objList.add(i); + objList.add("c2value" + i); + objList.add("c3value" + i); + objList.add("p1value6"); - for(int i = 0;i < 10;i++) { - List objList = new ArrayList(); + recordsContainingPartitionCols.add(new DefaultHCatRecord(objList)); + } - objList.add(i); - objList.add("co strvalue" + i); - objList.add("co str2value" + i); + exc = null; + try { + runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20, true); + } catch (IOException e) { + exc = e; + } - writeRecords.add(new DefaultHCatRecord(objList)); + List records = runMRRead(20, "part1 = \"p1value6\""); + assertEquals(20, records.size()); + records = runMRRead(20, "part0 = \"p0value6\""); + assertEquals(20, records.size()); + Integer i = 0; + for (HCatRecord rec : records) { + assertEquals(5, rec.size()); + assertTrue(rec.get(0).equals(i)); + assertTrue(rec.get(1).equals("c2value" + i)); + assertTrue(rec.get(2).equals("c3value" + i)); + assertTrue(rec.get(3).equals("p1value6")); + assertTrue(rec.get(4).equals("p0value6")); + i++; + } } - Map partitionMap = new HashMap(); - partitionMap.put("part1", "p1value8"); - partitionMap.put("part0", "p0value8"); - - Exception exc = null; - try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); - } catch(IOException e) { - exc = e; - } + //check behavior while change the order of columns + private void columnOrderChangeTest() throws Exception { - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType()); + HCatSchema tableSchema = getTableSchema(); + assertEquals(5, tableSchema.getFields().size()); - partitionColumns = new ArrayList(); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); - partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c3", Constants.STRING_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - writeRecords = new ArrayList(); - for(int i = 0;i < 10;i++) { - List objList = new ArrayList(); + writeRecords = new ArrayList(); - objList.add(i); - objList.add("co strvalue" + i); + for (int i = 0; i < 10; i++) { + List objList = new ArrayList(); - writeRecords.add(new DefaultHCatRecord(objList)); - } + objList.add(i); + objList.add("co strvalue" + i); + objList.add("co str2value" + i); + + writeRecords.add(new DefaultHCatRecord(objList)); + } + + Map partitionMap = new HashMap(); + partitionMap.put("part1", "p1value8"); + partitionMap.put("part0", "p0value8"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); + Exception exc = null; + try { + runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); + } catch (IOException e) { + exc = e; + } - //Read should get 10 + 20 + 10 + 10 + 20 rows - runMRRead(70); - } + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH, ((HCatException) exc).getErrorType()); - //Test that data inserted through hcatoutputformat is readable from hive - private void hiveReadTest() throws Exception { - String query = "select * from " + tableName; - int retCode = driver.run(query).getResponseCode(); + partitionColumns = new ArrayList(); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); - if( retCode != 0 ) { - throw new Exception("Error " + retCode + " running query " + query); + writeRecords = new ArrayList(); + + for (int i = 0; i < 10; i++) { + List objList = new ArrayList(); + + objList.add(i); + objList.add("co strvalue" + i); + + writeRecords.add(new DefaultHCatRecord(objList)); + } + + runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); + + //Read should get 10 + 20 + 10 + 10 + 20 rows + runMRRead(70); } - ArrayList res = new ArrayList(); - driver.getResults(res); - assertEquals(70, res.size()); - } + //Test that data inserted through hcatoutputformat is readable from hive + private void hiveReadTest() throws Exception { + + String query = "select * from " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if (retCode != 0) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList res = new ArrayList(); + driver.getResults(res); + assertEquals(70, res.size()); + } }