hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r902706 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/vertica/src/java/org/apache/hadoop/vertica/ src/contrib/vertica/src/test/org/apache/hadoop/vertica/
Date Mon, 25 Jan 2010 05:00:01 GMT
Author: cdouglas
Date: Mon Jan 25 05:00:00 2010
New Revision: 902706

URL: http://svn.apache.org/viewvc?rev=902706&view=rev
Log:
MAPREDUCE-1097. Add support for Vertica 3.5 to its contrib module. Contributed by Omer Trajman

Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Mon Jan 25 05:00:00 2010
@@ -442,6 +442,9 @@
     MAPREDUCE-1317. Reduce the memory footprint of Rumen objects by interning
     host Strings. (Hong Tang via cdouglas)
 
+    MAPREDUCE-1097. Add support for Vertica 3.5 to its contrib module. (Omer
+    Trajman via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-1258. Fix fair scheduler event log not logging job info.

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
Mon Jan 25 05:00:00 2010
@@ -55,6 +55,9 @@
  * @see VerticaOutputFormat#setOutput(Job, String, boolean, String...)
  */
 public class VerticaConfiguration {
+  /** Vertica Version Constants */
+  public static final Integer VERSION_3_5 = 305;
+  
   /** Class name for Vertica JDBC Driver */
   public static final String VERTICA_DRIVER_CLASS = "com.vertica.Driver";
 

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
Mon Jan 25 05:00:00 2010
@@ -63,7 +63,7 @@
    * @param dropTable
    */
   public static void setOutput(Job job, String tableName, boolean dropTable) {
-    setOutput(job, tableName, dropTable);
+    setOutput(job, tableName, dropTable, (String[])null);
   }
 
   /**
@@ -193,39 +193,46 @@
     stmt.execute("select create_projection_design('" + designName + "', '', '"
         + designTables.toString() + "')");
 
-    rs = stmt.executeQuery("select get_design_script('" + designName + "', '"
-        + designName + "')");
-    rs.next();
-    String[] projSet = rs.getString(1).split(";");
-    for (String proj : projSet) {
-      stmt.execute(proj);
-    }
-    stmt.execute("select start_refresh()");
-
-    // pool for refresh complete
-    boolean refreshing = true;
-    Long timeout = vtconfig.getOptimizePollTimeout();
-    while (refreshing) {
-      refreshing = false;
-      rs = stmt
-          .executeQuery("select table_name, projection_name, status from vt_projection_refresh");
-      while (rs.next()) {
-        String table = rs.getString(1);
-        String stat = rs.getString(3);
-        if (stat.equals("refreshing") && tablesWithTemp.contains(table))
-          refreshing = true;
+    if(VerticaUtil.verticaVersion(conf, true) >= VerticaConfiguration.VERSION_3_5) {
+      stmt.execute("select deploy_design('" + designName + "', '" + designName + "')");
+    } else {
+      rs = stmt.executeQuery("select get_design_script('" + designName + "', '"
+          + designName + "')");
+      rs.next();
+      String[] projSet = rs.getString(1).split(";");
+      for (String proj : projSet) {
+        stmt.execute(proj);
       }
-
-      Thread.sleep(timeout);
-    }
-
-    // refresh done, move the ahm and drop the temp projections
-    stmt.execute("select make_ahm_now()");
-
-    for (String table : tablesWithTemp) {
-      for (String proj : tableProj.get(table)) {
-        stmt.execute("DROP PROJECTION " + proj);
+      stmt.execute("select start_refresh()");
+  
+      // poll for refresh complete
+      boolean refreshing = true;
+      Long timeout = vtconfig.getOptimizePollTimeout();
+      while (refreshing) {
+        refreshing = false;
+        rs = stmt
+            .executeQuery("select table_name, status from vt_projection_refresh");
+        while (rs.next()) {
+          String table = rs.getString(1);
+          String stat = rs.getString(2);
+          if (stat.equals("refreshing") && tablesWithTemp.contains(table))
+            refreshing = true;
+        }
+        rs.close();
+  
+        Thread.sleep(timeout);
       }
+  
+      // refresh done, move the ancient history mark (ahm) and drop the temp projections
+      stmt.execute("select make_ahm_now()");
+  
+      for (String table : tablesWithTemp) {
+        for (String proj : tableProj.get(table)) {
+          stmt.execute("DROP PROJECTION " + proj);
+        }
+      }
+
+      stmt.close();
     }
   }
 
@@ -235,4 +242,4 @@
     return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
         context);
   }
-}
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
Mon Jan 25 05:00:00 2010
@@ -21,6 +21,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.ResultSet;
@@ -204,6 +205,9 @@
       case Types.REAL:
       case Types.DECIMAL:
       case Types.NUMERIC:
+        if (!(value instanceof BigDecimal))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to BigDecimal");
       case Types.DOUBLE:
         if (!(value instanceof Double) && !(value instanceof Float)
             && !(value instanceof DoubleWritable)
@@ -291,6 +295,8 @@
         this.types.add(Types.INTEGER);
       } else if (obj instanceof Short) {
         this.types.add(Types.SMALLINT);
+      } else if (obj instanceof BigDecimal) {
+        this.types.add(Types.NUMERIC);
       } else if (obj instanceof DoubleWritable) {
         this.types.add(Types.DOUBLE);
       } else if (obj instanceof Double) {
@@ -348,18 +354,18 @@
         break;
       case Types.BIGINT:
         if (obj instanceof Long) {
-          sb.append(((Long) obj).toString());
+          sb.append(obj.toString());
           break;
         }
       case Types.INTEGER:
         if (obj instanceof Integer) {
-          sb.append(((Integer) obj).toString());
+          sb.append(obj.toString());
           break;
         }
       case Types.TINYINT:
       case Types.SMALLINT:
         if (obj instanceof Short) {
-          sb.append(((Short) obj).toString());
+          sb.append(obj.toString());
           break;
         }
         if (obj instanceof LongWritable) {
@@ -377,18 +383,22 @@
       case Types.REAL:
       case Types.DECIMAL:
       case Types.NUMERIC:
+        if (obj instanceof BigDecimal) {
+          sb.append(obj.toString());
+          break;
+        }
       case Types.DOUBLE:
         if (obj instanceof Double) {
-          sb.append(((Double) obj).toString());
+          sb.append(obj.toString());
           break;
         }
         if (obj instanceof DoubleWritable) {
-          sb.append(((DoubleWritable) obj).toString());
+          sb.append(((DoubleWritable) obj).get());
           break;
         }
       case Types.FLOAT:
         if (obj instanceof Float) {
-          sb.append(((Float) obj).toString());
+          sb.append(obj.toString());
           break;
         }
         if (obj instanceof FloatWritable) {
@@ -494,6 +504,8 @@
       case Types.REAL:
       case Types.DECIMAL:
       case Types.NUMERIC:
+        values.add(new BigDecimal(Text.readString(in)));
+        break;
       case Types.DOUBLE:
         values.add(in.readDouble());
         break;
@@ -587,6 +599,8 @@
       case Types.REAL:
       case Types.DECIMAL:
       case Types.NUMERIC:
+        Text.writeString(out, obj.toString());
+        break;
       case Types.DOUBLE:
         out.writeDouble((Double) obj);
         break;
@@ -643,4 +657,4 @@
     }
   }
 
-}
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
Mon Jan 25 05:00:00 2010
@@ -31,6 +31,7 @@
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
@@ -99,8 +100,9 @@
   @Override
   public void close(TaskAttemptContext context) throws IOException {
     try {
-      if (statement != null)
+      if (statement != null) {
         finishCopyIn.invoke(statement); // statement.finishCopyIn();
+      }
     } catch (Exception e) {
       throw new IOException(e);
     }

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
Mon Jan 25 05:00:00 2010
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
Mon Jan 25 05:00:00 2010
@@ -31,13 +31,29 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.conf.Configuration;
 
 public class VerticaUtil {
   private static final Log LOG = LogFactory.getLog(VerticaUtil.class);
 
+  public static int verticaVersion(Configuration conf, boolean output) throws IOException
{
+    int ver = -1;
+    try {
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+    Connection conn = vtconfig.getConnection(output);
+    DatabaseMetaData dbmd = conn.getMetaData();
+    ver = dbmd.getDatabaseMajorVersion() * 100;
+    ver += dbmd.getDatabaseMinorVersion();
+    } catch(ClassNotFoundException e) { 
+      throw new IOException("Vertica Driver required to use Vertica Input or Output Formatters");

+    } catch (SQLException e) { throw new IOException(e); }
+    return ver;
+  }
+  
   public static void checkOutputSpecs(Configuration conf) throws IOException {
     VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
 
@@ -67,20 +83,24 @@
       stmt = conn.createStatement();
 
       if (tableExists && dropTable) {
-        // TODO: need truncate support
-        // for now drop the table if it exists
-        // if def is empty, grab the columns first
-        if (def == null) {
-          rs = dbmd.getColumns(null, schema, table, null);
-          ArrayList<String> defs = new ArrayList<String>();
-          while (rs.next())
-            defs.add(rs.getString(4) + " " + rs.getString(5));
-          def = defs.toArray(new String[0]);
+        if(verticaVersion(conf, true) >= 305) {
+          stmt = conn.createStatement();
+          stmt.execute("TRUNCATE TABLE " + writerTable);
+        } else {
+          // for version < 3.0 drop the table if it exists
+          // if def is empty, grab the columns first to redfine the table
+          if (def == null) {
+            rs = dbmd.getColumns(null, schema, table, null);
+            ArrayList<String> defs = new ArrayList<String>();
+            while (rs.next())
+              defs.add(rs.getString(4) + " " + rs.getString(5));
+            def = defs.toArray(new String[0]);
+          }
+  
+          stmt = conn.createStatement();
+          stmt.execute("DROP TABLE " + writerTable + " CASCADE");
+          tableExists = false; // force create
         }
-
-        stmt = conn.createStatement();
-        stmt.execute("DROP TABLE " + writerTable + " CASCADE");
-        tableExists = false; // force create
       }
 
       // create table if it doesn't exist
@@ -120,7 +140,7 @@
   public static List<InputSplit> getSplits(JobContext context)
       throws IOException {
     Configuration conf = context.getConfiguration();
-    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    int numSplits = conf.getInt("mapreduce.job.maps", 1);
     LOG.debug("creating splits up to " + numSplits);
     List<InputSplit> splits = new ArrayList<InputSplit>();
     int i = 0;

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java?rev=902706&r1=902705&r2=902706&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
Mon Jan 25 05:00:00 2010
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -125,11 +126,11 @@
     types.add(Types.SMALLINT);
     values.add((short) 4); // SMALLINT
     types.add(Types.REAL);
-    values.add(15234342345.532637); // REAL
+    values.add(new BigDecimal(15234342345.532637)); // REAL
     types.add(Types.DECIMAL);
-    values.add(346223093.4256); // DECIMAL
+    values.add(new BigDecimal(346223093.4256)); // DECIMAL
     types.add(Types.NUMERIC);
-    values.add(209232301132.4203); // NUMERIC
+    values.add(new BigDecimal(209232301132.4203)); // NUMERIC
     types.add(Types.DOUBLE);
     values.add(934029342.234); // DOUBLE
     types.add(Types.FLOAT);



Mime
View raw message