hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r816735 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/vertica/ src/contrib/vertica/ivy/ src/contrib/vertica/src/ src/contrib/vertica/src/java/ src/contrib/vertica/src/java/org/ src/contrib/vertica/src/java/org/apache/ src/...
Date Fri, 18 Sep 2009 18:24:32 GMT
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaStreamingRecordReader extends RecordReader<Text, Text> {
+  ResultSet results = null;
+  VerticaRecord internalRecord = null;
+  long start = 0;
+  int pos = 0;
+  long length = 0;
+  VerticaInputSplit split = null;
+  String delimiter = VerticaConfiguration.DELIMITER;
+  String terminator = VerticaConfiguration.RECORD_TERMINATER;
+  Text key = new Text();
+  Text value = new Text();
+
+  public VerticaStreamingRecordReader(VerticaInputSplit split,
+      Configuration conf) throws Exception {
+    // run query for this segment
+    this.split = split;
+    split.configure(conf);
+    start = split.getStart();
+    length = split.getLength();
+    results = split.executeQuery();
+    internalRecord = new VerticaRecord(results, false);
+
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+    delimiter = vtconfig.getInputDelimiter();
+    terminator = vtconfig.getInputRecordTerminator();
+  }
+
+  /** {@inheritDoc} */
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // nothing
+  }
+
+  /** {@inheritDoc} */
+  public void close() throws IOException {
+    try {
+      split.close();
+    } catch (SQLException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+  /** {@inheritDoc} */
+  public float getProgress() throws IOException {
+    // TODO: figure out why length would be 0
+    if (length == 0)
+      return 1;
+    return pos / length;
+  }
+
+  /** {@inheritDoc} */
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  /** {@inheritDoc} */
+  public Text getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
+
+  /** {@inheritDoc} */
+  public boolean nextKeyValue() throws IOException {
+    key.set(new Long(pos + start).toString());
+    pos++;
+    try {
+      if (internalRecord.next()) {
+        value.set(internalRecord.toSQLString(delimiter, terminator));
+        return true;
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+    }
+    return false;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaStreamingRecordWriter extends RecordWriter<Text, Text> {
+  private static final Log LOG = LogFactory
+      .getLog(VerticaStreamingRecordWriter.class);
+
+  String writerTable = null;
+  Connection connection = null;
+  Statement statement = null; // com.vertica.PGStatement
+  String copyStmt = null;
+
+  // Methods from com.vertica.PGStatement
+  Method startCopyIn = null;
+  Method finishCopyIn = null;
+  Method addStreamToCopyIn = null;
+
+  public VerticaStreamingRecordWriter(Connection connection, String copyStmt,
+      String writerTable) {
+    this.connection = connection;
+    this.copyStmt = copyStmt;
+    this.writerTable = writerTable;
+
+    try {
+      startCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "startCopyIn", String.class, ByteArrayInputStream.class);
+      finishCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "finishCopyIn");
+      addStreamToCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "addStreamToCopyIn", ByteArrayInputStream.class);
+    } catch (Exception ee) {
+      throw new RuntimeException(
+          "Vertica Formatter requies the Vertica jdbc driver");
+    }
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    try {
+      if (statement != null) {
+        finishCopyIn.invoke(statement); // statement.finishCopyIn();
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void write(Text table, Text record) throws IOException {
+    if (!table.toString().equals(writerTable))
+      throw new IOException("Writing to different table " + table.toString()
+          + ". Expecting " + writerTable);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("writing " + record.toString());
+    }
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(record.getBytes());
+    try {
+      if (statement == null) {
+        statement = connection.createStatement();
+        startCopyIn.invoke(statement, copyStmt, bais); // statement.startCopyIn(copyStmt,
+                                                        // bais);
+      } else
+        addStreamToCopyIn.invoke(statement, bais); // statement.addStreamToCopyIn(bais);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.conf.Configuration;
+
+public class VerticaUtil {
+  private static final Log LOG = LogFactory.getLog(VerticaUtil.class);
+
+  public static void checkOutputSpecs(Configuration conf) throws IOException {
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+
+    String writerTable = vtconfig.getOutputTableName();
+    if (writerTable == null)
+      throw new IOException("Vertica output requires a table name defined by "
+          + VerticaConfiguration.OUTPUT_TABLE_NAME_PROP);
+    String[] def = vtconfig.getOutputTableDef();
+    boolean dropTable = vtconfig.getDropTable();
+
+    String schema = null;
+    String table = null;
+    String[] schemaTable = writerTable.split("\\.");
+    if (schemaTable.length == 2) {
+      schema = schemaTable[0];
+      table = schemaTable[1];
+    } else
+      table = schemaTable[0];
+
+    Statement stmt = null;
+    try {
+      Connection conn = vtconfig.getConnection(true);
+      DatabaseMetaData dbmd = conn.getMetaData();
+      ResultSet rs = dbmd.getTables(null, schema, table, null);
+      boolean tableExists = rs.next();
+
+      stmt = conn.createStatement();
+
+      if (tableExists && dropTable) {
+        // TODO: need truncate support
+        // for now drop the table if it exists
+        // if def is empty, grab the columns first
+        if (def == null) {
+          rs = dbmd.getColumns(null, schema, table, null);
+          ArrayList<String> defs = new ArrayList<String>();
+          while (rs.next())
+            defs.add(rs.getString(4) + " " + rs.getString(5));
+          def = defs.toArray(new String[0]);
+        }
+
+        stmt = conn.createStatement();
+        stmt.execute("DROP TABLE " + writerTable + " CASCADE");
+        tableExists = false; // force create
+      }
+
+      // create table if it doesn't exist
+      if (!tableExists) {
+        if (def == null)
+          throw new RuntimeException("Table " + writerTable
+              + " does not exist and no table definition provided");
+        if (schema != null) {
+          rs = dbmd.getSchemas(null, schema);
+          if (!rs.next())
+            stmt.execute("CREATE SCHEMA " + schema);
+        }
+        StringBuffer tabledef = new StringBuffer("CREATE TABLE ").append(
+            writerTable).append(" (");
+        for (String column : def)
+          tabledef.append(column).append(",");
+        tabledef.replace(tabledef.length() - 1, tabledef.length(), ")");
+
+        stmt.execute(tabledef.toString());
+        // TODO: create segmented projections
+        stmt.execute("select implement_temp_design('" + writerTable + "')");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (stmt != null)
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+    }
+  }
+
+  // TODO: catch when params required but missing
+  // TODO: better error message when count query is bad
+  public static List<InputSplit> getSplits(JobContext context)
+      throws IOException {
+    Configuration conf = context.getConfiguration();
+    int numSplits = conf.getInt("mapred.map.tasks", 1);
+    LOG.debug("creating splits up to " + numSplits);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    int i = 0;
+    long start = 0;
+    long end = 0;
+    boolean limitOffset = true;
+
+    // This is the fancy part of mapping inputs...here's how we figure out
+    // splits
+    // get the params query or the params
+    VerticaConfiguration config = new VerticaConfiguration(conf);
+    String inputQuery = config.getInputQuery();
+
+    if (inputQuery == null)
+      throw new IOException("Vertica input requires query defined by "
+          + VerticaConfiguration.QUERY_PROP);
+
+    String paramsQuery = config.getParamsQuery();
+    Collection<List<Object>> params = config.getInputParameters();
+
+    // TODO: limit needs order by unique key
+    // TODO: what if there are more parameters than numsplits?
+    // prep a count(*) wrapper query and then populate the bind params for each
+    String countQuery = "SELECT COUNT(*) FROM (\n" + inputQuery + "\n) count";
+
+    if (paramsQuery != null) {
+      LOG.debug("creating splits using paramsQuery :" + paramsQuery);
+      Connection conn = null;
+      Statement stmt = null;
+      try {
+        conn = config.getConnection(false);
+        stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(paramsQuery);
+        ResultSetMetaData rsmd = rs.getMetaData();
+        while (rs.next()) {
+          limitOffset = false;
+          List<Object> segmentParams = new ArrayList<Object>();
+          for (int j = 1; j <= rsmd.getColumnCount(); j++) {
+            segmentParams.add(rs.getObject(j));
+          }
+          splits.add(new VerticaInputSplit(inputQuery, segmentParams, start,
+              end));
+        }
+      } catch (Exception e) {
+        throw new IOException(e);
+      } finally {
+        try {
+          if (stmt != null)
+            stmt.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    } else if (params != null && params.size() > 0) {
+      LOG.debug("creating splits using " + params.size() + " params");
+      limitOffset = false;
+      for (List<Object> segmentParams : params) {
+        // if there are more numSplits than params we're going to introduce some
+        // limit and offsets
+        // TODO: write code to generate the start/end pairs for each group
+        splits
+            .add(new VerticaInputSplit(inputQuery, segmentParams, start, end));
+      }
+    }
+
+    if (limitOffset) {
+      LOG.debug("creating splits using limit and offset");
+      Connection conn = null;
+      Statement stmt = null;
+      long count = 0;
+
+      try {
+        conn = config.getConnection(false);
+        stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(countQuery);
+        rs.next();
+        count = rs.getLong(1);
+      } catch (Exception e) {
+        throw new IOException(e);
+      } finally {
+        try {
+          if (stmt != null)
+            stmt.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+
+      long splitSize = count / numSplits;
+      end = splitSize;
+
+      LOG.debug("creating " + numSplits + " splits for " + count + " records");
+
+      for (i = 0; i < numSplits; i++) {
+        splits.add(new VerticaInputSplit(inputQuery, null, start, end));
+        start += splitSize;
+        end += splitSize;
+      }
+    }
+
+    LOG.debug("returning " + splits.size() + " final splits");
+    return splits;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.FileNotFoundException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.vertica.VerticaConfiguration;
+
+/**
+ * All tests for Vertica Formatters (org.apache.hadoop.vertica)
+ * 
+ * 
+ */
+public final class AllTests {
+  private static final Log LOG = LogFactory.getLog(AllTests.class);
+
+  static final String VERTICA_HOSTNAME = "localhost";
+  static final String VERTICA_USERNAME = "dbadmin";
+  static final String VERTICA_PASSWORD = "";
+  static final String VERTICA_DATABASE = "db";
+
+  static String hostname;
+  static String username;
+  static String password;
+  static String database;
+
+  static boolean run_tests = false;
+  
+  public static String getHostname() {
+    return hostname;
+  }
+
+  public static String getUsername() {
+    return username;
+  }
+
+  public static String getPassword() {
+    return password;
+  }
+
+  public static String getDatabase() {
+    return database;
+  }
+
+  public static boolean isSetup() {
+    return run_tests;
+  } 
+
+  private AllTests() {
+  }
+
+  public static void configure() {
+    if (run_tests) {
+      return;
+    }
+
+    Properties properties = System.getProperties();
+
+    String test_setup = properties.getProperty("vertica.test_setup", "vertica_test.sql");
+    hostname = properties.getProperty("vertica.hostname", VERTICA_HOSTNAME);
+    username = properties.getProperty("vertica.username", VERTICA_USERNAME);
+    password = properties.getProperty("vertica.password", VERTICA_PASSWORD);
+    database = properties.getProperty("vertica.database", VERTICA_DATABASE);
+
+    LOG.info("Inititializing database with " + test_setup);
+    try {
+      Class.forName(VerticaConfiguration.VERTICA_DRIVER_CLASS);
+      String url = "jdbc:vertica://" + hostname + ":5433/" + database
+          + "?user=" + username + "&password=" + password;
+      LOG.info("Conencting to " + url);
+      Connection conn = DriverManager.getConnection(url);
+      Statement stmt = conn.createStatement();
+
+      InputStream strm_cmds = new FileInputStream(test_setup);
+
+      if (strm_cmds != null) {
+        byte[] b = new byte[strm_cmds.available()];
+        strm_cmds.read(b);
+        String[] cmds = new String(b).split("\n");
+
+        StringBuffer no_comment = new StringBuffer();
+        for (String cmd : cmds) {
+          if (!cmd.startsWith("--"))
+            no_comment.append(cmd).append("\n");
+        }
+
+        for (String cmd : no_comment.toString().split(";")) {
+          LOG.debug(cmd);
+          try {
+            stmt.execute(cmd);
+          } catch (SQLException e) {
+            LOG.debug(e.getSQLState() + " : " + e.getMessage());
+            if (e.getSQLState().equals("42V01"))
+              continue;
+            else
+              throw new RuntimeException(e);
+          }
+
+        }
+
+        run_tests = true;
+      }
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No vertica driver found: " + e.getMessage() + " - skipping vertica tests");
+    } catch (SQLException e) {
+      LOG.warn("Could not connect to vertica database: " + e.getMessage() + " - skipping
vertica tests");
+    } catch (IOException e) {
+      LOG.warn("Missing vertica test setup file " + test_setup + ": " + e.getMessage() +
" - skipping vertica tests");
+    }
+  }
+
+  public static Test suite() {
+    configure();
+    TestSuite suite = new TestSuite("Tests for org.apache.hadoop.vertica");
+
+    if (run_tests) {
+      suite.addTestSuite(TestVertica.class);
+      suite.addTestSuite(TestExample.class);
+    }
+    return suite;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.vertica.VerticaConfiguration;
+import org.apache.hadoop.vertica.VerticaInputFormat;
+import org.apache.hadoop.vertica.VerticaOutputFormat;
+import org.apache.hadoop.vertica.VerticaRecord;
+
+public class TestExample extends VerticaTestCase implements Tool {
+
+  public TestExample(String name) {
+    super(name);
+  }
+
+  public static class Map extends
+      Mapper<LongWritable, VerticaRecord, Text, DoubleWritable> {
+
+    public void map(LongWritable key, VerticaRecord value, Context context)
+        throws IOException, InterruptedException {
+      List<Object> record = value.getValues();
+      context.write(new Text((String) record.get(1)), new DoubleWritable(
+          (Long) record.get(0)));
+    }
+  }
+
+  public static class Reduce extends
+      Reducer<Text, DoubleWritable, Text, VerticaRecord> {
+    VerticaRecord record = null;
+
+    public void setup(Context context) throws IOException, InterruptedException {
+      super.setup(context);
+      try {
+        record = VerticaOutputFormat.getValue(context.getConfiguration());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    protected void reduce(Text key, Iterable<DoubleWritable> values,
+        Context context) throws IOException, InterruptedException {
+      if (record == null) {
+        throw new IOException("No output record found");
+      }
+      
+      record.set(0, 125, true);
+      record.set(1, true, true);
+      record.set(2, 'c', true);
+      record.set(3, Calendar.getInstance().getTime(), true);
+      record.set(4, 234.526, true);
+      record.set(5, Calendar.getInstance().getTime(), true);
+      record.set(6, "foobar string", true);
+      record.set(7, new byte[10], true);
+      context.write(new Text("mrtarget"), record);
+    }
+  }
+
+  public Job getJob() throws IOException {
+    Job job = new Job();
+    job.setJarByClass(TestExample.class);
+    job.setJobName("vertica test");
+
+    Configuration conf = job.getConfiguration();
+    conf.set("mapred.job.tracker", "local");
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VerticaRecord.class);
+    job.setInputFormatClass(VerticaInputFormat.class);
+    job.setOutputFormatClass(VerticaOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(DoubleWritable.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    VerticaOutputFormat.setOutput(job, "mrtarget", true, "a int", "b boolean",
+        "c char(1)", "d date", "f float", "t timestamp", "v varchar",
+        "z varbinary");
+    VerticaConfiguration.configureVertica(conf,
+        new String[] { AllTests.getHostname() }, AllTests.getDatabase(),
+        AllTests.getUsername(), AllTests.getPassword());
+    return job;
+  }
+
+  @SuppressWarnings("serial")
+  public void testExample() throws Exception {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    Job job = getJob();
+    VerticaInputFormat.setInput(job, "select * from mrsource");
+    job.waitForCompletion(true);
+
+    job = getJob();
+    VerticaInputFormat.setInput(job, "select * from mrsource where key = ?",
+        "select distinct key from mrsource");
+    job.waitForCompletion(true);
+
+    job = getJob();
+    Collection<List<Object>> params = new HashSet<List<Object>>()
{
+    };
+    List<Object> param = new ArrayList<Object>();
+    param.add(new Integer(0));
+    params.add(param);
+    VerticaInputFormat.setInput(job, "select * from mrsource where key = ?",
+        params);
+    job.waitForCompletion(true);
+
+    job = getJob();
+    VerticaInputFormat.setInput(job, "select * from mrsource where key = ?",
+        "0", "1", "2");
+    job.waitForCompletion(true);
+   
+    VerticaOutputFormat.optimize(job.getConfiguration());
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public Configuration getConf() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration arg0) {
+    // TODO Auto-generated method stub
+
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.vertica.VerticaConfiguration;
+import org.apache.hadoop.vertica.VerticaInputFormat;
+import org.apache.hadoop.vertica.VerticaInputSplit;
+import org.apache.hadoop.vertica.VerticaOutputFormat;
+import org.apache.hadoop.vertica.VerticaRecord;
+import org.apache.hadoop.vertica.VerticaRecordReader;
+import org.apache.hadoop.vertica.VerticaRecordWriter;
+
+public class TestVertica extends VerticaTestCase {
+
+  public TestVertica(String name) {
+    super(name);
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /**
+   * Fake class used to create a job conf
+   */
+  public class VerticaTestMR extends Configured {
+  }
+
+  public Job getVerticaJob() throws IOException {
+    Configuration conf = new Configuration(true);
+    Job job = new Job(conf, "TestVertica");
+    job.setJarByClass(VerticaTestMR.class);
+
+    VerticaConfiguration.configureVertica(job.getConfiguration(),
+        new String[] { AllTests.getHostname() }, AllTests.getDatabase(),
+        AllTests.getUsername(), AllTests.getPassword());
+    return job;
+  }
+
+  public VerticaInputSplit getVerticaSplit(boolean fake) throws Exception {
+    List<Object> segment_params = new ArrayList<Object>();
+    long start = 0;
+    long end = 0;
+    String input_query = "SELECT value FROM mrsource WHERE key = ?";
+
+    segment_params.add(3);
+    if (fake) {
+      segment_params.add(Calendar.getInstance().getTime());
+      segment_params.add("foobar");
+      start = 5;
+      end = 10;
+    }
+
+    VerticaInputSplit input = new VerticaInputSplit(input_query,
+        segment_params, start, end);
+    input.configure(getVerticaJob().getConfiguration());
+
+    return input;
+  }
+
+  public void testVerticaRecord() throws ParseException, IOException {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    List<Integer> types = new ArrayList<Integer>();
+    List<Object> values = new ArrayList<Object>();
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+    DateFormat datefmt = new SimpleDateFormat("yyyy-MM-dd");
+    DateFormat timefmt = new SimpleDateFormat("HH:mm:ss");
+    DateFormat tmstmpfmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    types.add(Types.BIGINT);
+    values.add(209348039485345L); // BIGINT
+    types.add(Types.INTEGER);
+    values.add(2342345); // INTGER
+    types.add(Types.TINYINT);
+    values.add((short) 564); // TINYINT
+    types.add(Types.SMALLINT);
+    values.add((short) 4); // SMALLINT
+    types.add(Types.REAL);
+    values.add(15234342345.532637); // REAL
+    types.add(Types.DECIMAL);
+    values.add(346223093.4256); // DECIMAL
+    types.add(Types.NUMERIC);
+    values.add(209232301132.4203); // NUMERIC
+    types.add(Types.DOUBLE);
+    values.add(934029342.234); // DOUBLE
+    types.add(Types.FLOAT);
+    values.add((float) 62304.235); // FLOAT
+    types.add(Types.BINARY);
+    values.add(new byte[10]); // BINARY
+    types.add(Types.LONGVARBINARY);
+    values.add(new byte[10]); // LONGVARBINARY
+    types.add(Types.VARBINARY);
+    values.add(new byte[10]); // VARBINARY
+    types.add(Types.BOOLEAN);
+    values.add(new Boolean(true)); // BOOLEAN
+    types.add(Types.CHAR);
+    values.add('x'); // CHAR
+    types.add(Types.LONGNVARCHAR);
+    values.add("2ialnnnnsfm9.3;olainlekf nasl f'\\4\r\n"); // LONGNVARCHAR
+    types.add(Types.LONGVARCHAR);
+    values.add("3jflin4f'\\4\r\n'"); // LONGVARCHAR
+    types.add(Types.NCHAR);
+    values.add("jf|ls4\\4\r\nf44sf"); // NCHAR
+    types.add(Types.VARCHAR);
+    values.add("4filjsf!@#$^&)*()"); // VARCHAR
+    types.add(Types.DATE);
+    values.add(new Date(datefmt.parse("2009-06-07").getTime())); // DATE
+    types.add(Types.TIME);
+    values.add(new Time(timefmt.parse("16:17:18.90").getTime())); // TIME
+    types.add(Types.TIMESTAMP);
+    values
+        .add(new Timestamp(tmstmpfmt.parse("2007-08-09 6:07:05.06").getTime())); // TIMESTAMP
+
+    String sql1 = null;
+    sql1 = recordTest(types, values, out, in, true);
+    
+    out = new DataOutputBuffer();
+    in = new DataInputBuffer();
+    String sql2 = null;
+    sql2 = recordTest(types, values, out, in, true);
+    
+    assertEquals("SQL Serialization test failed", sql1, sql2);
+  }
+
+  private String recordTest(List<Integer> types, List<Object> values,
+      DataOutputBuffer out, DataInputBuffer in, boolean date_string)
+      throws IOException {
+    VerticaRecord record = new VerticaRecord(null, types, values, date_string);
+
+    // TODO: test values as hashmap of column names
+
+    // write values into an output buffer
+    record.write(out);
+
+    // copy to an input buffer
+    in.reset(out.getData(), out.getLength());
+
+    // create a new record with new values
+    List<Object> new_values = new ArrayList<Object>();
+    record = new VerticaRecord(null, types, new_values, date_string);
+
+    // read back into values
+    record.readFields(in);
+
+    // compare values
+    for(int i = 0; i < values.size(); i++)
+      if(values.get(i).getClass().isArray()) {
+        Object a = values.get(i);
+        Object b = new_values.get(i);
+        for(int j = 0; j < Array.getLength(a); j++)
+          assertEquals("Vertica Record serialized value " + i + "[" + j + "] does not match",
Array.get(a, j), Array.get(b, j));
+      }
+      else {
+        assertEquals("Vertica Record serialized value " + i + " does not match", values.get(i),
new_values.get(i));
+      }
+
+    // data in sql form
+    return record.toSQLString();
+  }
+
+  public void testVerticaSplit() throws Exception {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    VerticaInputSplit input = getVerticaSplit(true);
+    VerticaInputSplit rem_input = new VerticaInputSplit();
+
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+
+    input.write(out);
+
+    in.reset(out.getData(), out.getLength());
+
+    rem_input.readFields(in);
+    assertEquals("Serialized segment params do not match", rem_input.getSegmentParams(),
input.getSegmentParams());
+    assertEquals("Serialized start does not match", rem_input.getStart(), input.getStart());
+    assertEquals("Serialized length does not match", rem_input.getLength(), input.getLength());
+  }
+
+  public void testVerticaReader() throws Exception {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    VerticaInputSplit input = getVerticaSplit(false);
+    VerticaRecordReader reader = new VerticaRecordReader(input, input
+        .getConfiguration());
+    TaskAttemptContext context = new TaskAttemptContext(input
+        .getConfiguration(), new TaskAttemptID());
+    reader.initialize(input, context);
+
+    boolean hasValue = reader.nextKeyValue();
+    assertEquals("There should be a record in the database", hasValue, true);
+    
+    LongWritable key = reader.getCurrentKey();
+    VerticaRecord value = reader.getCurrentValue();
+
+    assertEquals("Key should be 1 for first record", key.get(), 1);
+    assertEquals("Result type should be VARCHAR", ((Integer)value.getTypes().get(0)).intValue(),
Types.VARCHAR);
+    assertEquals("Result value should be three", value.getValues().get(0), "three");
+    reader.close();
+  }
+
+  public void validateInput(Job job) throws IOException {
+    VerticaInputFormat input = new VerticaInputFormat();
+    List<InputSplit> splits = null;
+
+    Configuration conf = job.getConfiguration();
+    conf.setInt("mapred.map.tasks", 1);
+    JobContext context = new JobContext(conf, new JobID());
+
+    splits = input.getSplits(context);
+    assert splits.size() == 1;
+
+    conf.setInt("mapred.map.tasks", 3);
+    splits = input.getSplits(context);
+    assert splits.size() == 3;
+
+    conf.setInt("mapred.map.tasks", 10);
+    splits = input.getSplits(context);
+    assert splits.size() == 10;
+  }
+
+  public void testVerticaInput() throws IOException {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    String input_query1 = "SELECT value FROM mrsource";
+    String input_query2 = "SELECT value FROM mrsource WHERE key = ?";
+    String segment_query = "SELECT y FROM bar";
+    List<List<Object>> segment_params = new ArrayList<List<Object>>();
+    for (int i = 0; i < 4; i++) {
+      ArrayList<Object> params = new ArrayList<Object>();
+      params.add(i);
+      segment_params.add(params);
+    }
+
+    Job job = getVerticaJob();
+    VerticaInputFormat.setInput(job, input_query1);
+    validateInput(job);
+
+    job = getVerticaJob();
+    VerticaInputFormat.setInput(job, input_query2, segment_query);
+    validateInput(job);
+
+    VerticaInputFormat.setInput(job, input_query2, segment_params);
+    validateInput(job);
+  }
+
+  public void testVerticaOutput() throws Exception {
+    if(!AllTests.isSetup()) {
+      return;
+    }
+
+    // TODO: test create schema
+    // TODO: test writable variants of data types
+    VerticaOutputFormat output = new VerticaOutputFormat();
+    Job job = getVerticaJob();
+    VerticaOutputFormat.setOutput(job, "mrtarget", true, "a int", "b boolean",
+        "c char(1)", "d date", "f float", "t timestamp", "v varchar",
+        "z varbinary");
+    output.checkOutputSpecs(job, true);
+    TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(),
+        new TaskAttemptID());
+    VerticaRecordWriter writer = (VerticaRecordWriter) output
+        .getRecordWriter(context);
+
+    Text table = new Text();
+    table.set("mrtarget");
+
+    VerticaRecord record = VerticaOutputFormat.getValue(job.getConfiguration());
+    record.set(0, 125, true);
+    record.set(1, true, true);
+    record.set(2, 'c', true);
+    record.set(3, Calendar.getInstance().getTime(), true);
+    record.set(4, 234.526, true);
+    record.set(5, Calendar.getInstance().getTime(), true);
+    record.set(6, "foobar string", true);
+    record.set(7, new byte[10], true);
+
+    writer.write(table, record);
+    writer.close(null);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java
Fri Sep 18 18:24:31 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import junit.framework.TestCase;
+
+public class VerticaTestCase extends TestCase {
+  public VerticaTestCase(String name) {
+    super(name);
+  }
+
+  {
+    AllTests.configure();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql Fri Sep 18 18:24:31
2009
@@ -0,0 +1,47 @@
+--/**
+-- * Licensed to the Apache Software Foundation (ASF) under one
+-- * or more contributor license agreements.  See the NOTICE file
+-- * distributed with this work for additional information
+-- * regarding copyright ownership.  The ASF licenses this file
+-- * to you under the Apache License, Version 2.0 (the
+-- * "License"); you may not use this file except in compliance
+-- * with the License.  You may obtain a copy of the License at
+-- *
+-- *     http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- */
+
+drop table mrsource cascade;
+create table mrsource (
+  key int,
+  value varchar(10)
+);
+
+drop table bar cascade;
+create table bar(
+  y int
+);
+
+select implement_temp_design('');
+
+insert into mrsource values(0, 'zero');
+insert into mrsource values(1, 'one');
+insert into mrsource values(2, 'two');
+insert into mrsource values(3, 'three');
+insert into mrsource values(4, 'four');
+insert into mrsource values(5, 'five');
+
+insert into bar values(0);
+insert into bar values(1);
+insert into bar values(2);
+insert into bar values(3);
+insert into bar values(4);
+insert into bar values(5);
+
+commit;
+select advance_epoch();



Mime
View raw message