hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r888761 - in /hadoop/mapreduce/trunk: ./ src/contrib/vertica/src/java/org/apache/hadoop/vertica/ src/contrib/vertica/src/test/org/apache/hadoop/vertica/
Date Wed, 09 Dec 2009 10:38:17 GMT
Author: cdouglas
Date: Wed Dec  9 10:38:16 2009
New Revision: 888761

URL: http://svn.apache.org/viewvc?rev=888761&view=rev
Log:
MAPREDUCE-1230. Fix handling of null records in VerticaInputFormat. Contributed by Omer Trajman

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=888761&r1=888760&r2=888761&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Dec  9 10:38:16 2009
@@ -976,3 +976,6 @@
 
     MAPREDUCE-952. Remove inadvertently reintroduced Task.Counter enum. (Jothi
     Padmanabhan via cdouglas)
+
+    MAPREDUCE-1230. Fix handling of null records in VerticaInputFormat. (Omer
+    Trajman via cdouglas)

Modified: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java?rev=888761&r1=888760&r2=888761&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
Wed Dec  9 10:38:16 2009
@@ -178,7 +178,7 @@
     if (i >= values.size())
       throw new IndexOutOfBoundsException("Index " + i
           + " greater than input size " + values.size());
-    if (validate) {
+    if (validate && value != null) {
       Integer type = types.get(i);
       switch (type) {
       case Types.BIGINT:
@@ -278,7 +278,7 @@
   private void objectTypes() {
     for (Object obj : values) {
       if (obj == null) {
-        this.types.add(null);
+        this.types.add(Types.NULL);
       } else if (obj instanceof Long) {
         this.types.add(Types.BIGINT);
       } else if (obj instanceof LongWritable) {
@@ -343,6 +343,9 @@
       // switch statement uses fall through to handle type variations
       // e.g. type specified as BIGINT but passed in as Integer
       switch (type) {
+      case Types.NULL:
+        sb.append("");
+        break;
       case Types.BIGINT:
         if (obj instanceof Long) {
           sb.append(((Long) obj).toString());
@@ -395,7 +398,8 @@
       case Types.BINARY:
       case Types.LONGVARBINARY:
       case Types.VARBINARY:
-        sb.append(ByteBuffer.wrap((byte[]) obj).asCharBuffer());
+        if(obj == null) sb.append("");
+        else sb.append(ByteBuffer.wrap((byte[]) obj).asCharBuffer());
         break;
       case Types.BIT:
       case Types.BOOLEAN:
@@ -452,7 +456,8 @@
           sb.append(sqlfmt.format((Timestamp) obj));
         break;
       default:
-        throw new RuntimeException("Unknown type value " + types.get(i));
+        if(obj == null) sb.append("");
+        else throw new RuntimeException("Unknown type value " + types.get(i));
       }
       if (i < columns - 1)
         sb.append(delimiterArg);
@@ -473,6 +478,9 @@
     for (int i = 0; i < columns; i++) {
       int type = types.get(i);
       switch (type) {
+      case Types.NULL:
+        values.add(null);
+        break;
       case Types.BIGINT:
         values.add(in.readLong());
         break;
@@ -551,13 +559,20 @@
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeInt(columns);
-    for (Integer type : types)
-      out.writeInt(type);
+    
+    for (int i = 0; i < columns; i++) {
+      Object obj = values.get(i);
+      Integer type = types.get(i);
+      if(obj == null) out.writeInt(Types.NULL);
+      else out.writeInt(type);
+    }
 
     for (int i = 0; i < columns; i++) {
       Object obj = values.get(i);
       Integer type = types.get(i);
 
+      if(obj == null) continue;
+      
       switch (type) {
       case Types.BIGINT:
         out.writeLong((Long) obj);

Modified: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java?rev=888761&r1=888760&r2=888761&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
Wed Dec  9 10:38:16 2009
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.vertica;
 
-import java.io.FileNotFoundException;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;

Modified: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java?rev=888761&r1=888760&r2=888761&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
Wed Dec  9 10:38:16 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -87,13 +88,16 @@
   }
 
   public Job getJob() throws IOException {
-    Job job = new Job();
+    Configuration conf = new Configuration(true);
+    Cluster cluster = new Cluster(conf);
+    Job job = Job.getInstance(cluster);
+    
+    conf = job.getConfiguration();
+    conf.set("mapreduce.job.tracker", "local");
+
     job.setJarByClass(TestExample.class);
     job.setJobName("vertica test");
 
-    Configuration conf = job.getConfiguration();
-    conf.set("mapred.job.tracker", "local");
-
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(VerticaRecord.class);
     job.setInputFormatClass(VerticaInputFormat.class);

Modified: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java?rev=888761&r1=888760&r2=888761&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
Wed Dec  9 10:38:16 2009
@@ -37,6 +37,7 @@
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -71,7 +72,8 @@
 
   public Job getVerticaJob() throws IOException {
     Configuration conf = new Configuration(true);
-    Job job = new Job(conf, "TestVertica");
+    Cluster cluster = new Cluster(conf);
+    Job job = Job.getInstance(cluster);
     job.setJarByClass(VerticaTestMR.class);
 
     VerticaConfiguration.configureVertica(job.getConfiguration(),
@@ -158,6 +160,51 @@
     values
         .add(new Timestamp(tmstmpfmt.parse("2007-08-09 6:07:05.06").getTime())); // TIMESTAMP
 
+    types.add(Types.BIGINT);
+    values.add(null); // BIGINT
+    types.add(Types.INTEGER);
+    values.add(null); // INTGER
+    types.add(Types.TINYINT);
+    values.add(null); // TINYINT
+    types.add(Types.SMALLINT);
+    values.add(null); // SMALLINT
+    types.add(Types.REAL);
+    values.add(null); // REAL
+    types.add(Types.DECIMAL);
+    values.add(null); // DECIMAL
+    types.add(Types.NUMERIC);
+    values.add(null); // NUMERIC
+    types.add(Types.DOUBLE);
+    values.add(null); // DOUBLE
+    types.add(Types.FLOAT);
+    values.add(null); // FLOAT
+    types.add(Types.BINARY);
+    values.add(null); // BINARY
+    types.add(Types.LONGVARBINARY);
+    values.add(null); // LONGVARBINARY
+    types.add(Types.VARBINARY);
+    values.add(null); // VARBINARY
+    types.add(Types.BOOLEAN);
+    values.add(null); // BOOLEAN
+    types.add(Types.CHAR);
+    values.add(null); // CHAR
+    types.add(Types.LONGNVARCHAR);
+    values.add(null); // LONGNVARCHAR
+    types.add(Types.LONGVARCHAR);
+    values.add(null); // LONGVARCHAR
+    types.add(Types.NCHAR);
+    values.add(null); // NCHAR
+    types.add(Types.VARCHAR);
+    values.add(null); // VARCHAR
+    types.add(Types.DATE);
+    values.add(null); // DATE
+    types.add(Types.TIME);
+    values.add(null); // TIME
+    types.add(Types.TIMESTAMP);
+    values
+        .add(null); // TIMESTAMP
+    
+    
     String sql1 = null;
     sql1 = recordTest(types, values, out, in, true);
     
@@ -191,7 +238,8 @@
 
     // compare values
     for(int i = 0; i < values.size(); i++)
-      if(values.get(i).getClass().isArray()) {
+      if(values.get(i) == null) assertSame("Vertica Record serialized value " + i + " is
null", values.get(i), new_values.get(i));
+      else if(values.get(i).getClass().isArray()) {
         Object a = values.get(i);
         Object b = new_values.get(i);
         for(int j = 0; j < Array.getLength(a); j++)
@@ -255,17 +303,17 @@
     List<InputSplit> splits = null;
 
     Configuration conf = job.getConfiguration();
-    conf.setInt("mapred.map.tasks", 1);
+    conf.setInt("mapreduce.job.maps", 1);
     JobContext context = new JobContextImpl(conf, new JobID());
 
     splits = input.getSplits(context);
     assert splits.size() == 1;
 
-    conf.setInt("mapred.map.tasks", 3);
+    conf.setInt("mapreduce.job.maps", 3);
     splits = input.getSplits(context);
     assert splits.size() == 3;
 
-    conf.setInt("mapred.map.tasks", 10);
+    conf.setInt("mapreduce.job.maps", 10);
     splits = input.getSplits(context);
     assert splits.size() == 10;
   }



Mime
View raw message