hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r816735 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/vertica/ src/contrib/vertica/ivy/ src/contrib/vertica/src/ src/contrib/vertica/src/java/ src/contrib/vertica/src/java/org/ src/contrib/vertica/src/java/org/apache/ src/...
Date Fri, 18 Sep 2009 18:24:32 GMT
Author: ddas
Date: Fri Sep 18 18:24:31 2009
New Revision: 816735

URL: http://svn.apache.org/viewvc?rev=816735&view=rev
Log:
MAPREDUCE-775. Add native and streaming support for Vertica as an input or output format taking advantage of parallel read and write properties of the DBMS. Contributed by Omer Trajman.

Added:
    hadoop/mapreduce/trunk/src/contrib/vertica/
    hadoop/mapreduce/trunk/src/contrib/vertica/build.xml
    hadoop/mapreduce/trunk/src/contrib/vertica/ivy/
    hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties
    hadoop/mapreduce/trunk/src/contrib/vertica/src/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
    hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java
    hadoop/mapreduce/trunk/src/contrib/vertica/testdata/
    hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/build.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816735&r1=816734&r2=816735&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 18:24:31 2009
@@ -114,6 +114,10 @@
     MAPREDUCE-777. Brand new apis to track and query jobs as a
     replacement for JobClient. (Amareshwari Sriramadasu via acmurthy)
 
+    MAPREDUCE-775. Add native and streaming support for Vertica as an input
+    or output format taking advantage of parallel read and write properties of 
+    the DBMS. (Omer Trajman via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/build.xml?rev=816735&r1=816734&r2=816735&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/build.xml Fri Sep 18 18:24:31 2009
@@ -57,6 +57,7 @@
       <fileset dir="." includes="mrunit/build.xml"/> 
       <fileset dir="." includes="dynamic-scheduler/build.xml"/>
       <fileset dir="." includes="gridmix/build.xml"/>
+      <fileset dir="." includes="vertica/build.xml"/>
     </subant>
     <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
     <fail if="testsfailed">Tests failed!</fail>

Added: hadoop/mapreduce/trunk/src/contrib/vertica/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/build.xml?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/build.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/build.xml Fri Sep 18 18:24:31 2009
@@ -0,0 +1,104 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="vertica" default="jar">
+
+  <import file="../build-contrib.xml"/>
+  <property environment="env"/>
+
+  <!-- ================================================================== -->
+  <!-- Run unit tests                                                     -->
+  <!-- Override with our own version so we can set hadoop.alt.classpath   -->
+  <!-- and Hadoop logger properties                                       -->
+  <!-- ================================================================== -->
+  <target name="test" depends="compile-test, compile" if="test.available">
+    <echo message="contrib: ${name}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
+    <delete dir="${build.test}/data"/>
+    <mkdir dir="${build.test}/data" />
+    <junit
+      printsummary="yes" showoutput="${test.output}"
+      haltonfailure="no" fork="yes" maxmemory="256m"
+      errorProperty="tests.failed" failureProperty="tests.failed"
+      timeout="${test.timeout}"
+      dir="${build.test}/data">
+
+      <!-- uncomment this if you want to attach a debugger -->
+      <!--
+      <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=2601" />
+      -->
+
+      <sysproperty key="test.build.data" value="${build.test}/data"/>
+      <sysproperty key="build.test" value="${build.test}"/>
+      <sysproperty key="contrib.name" value="${name}"/>
+
+      <!--
+           Added property needed to use the .class files for compilation
+           instead of depending on hadoop-*-core.jar
+      -->
+      <sysproperty key="hadoop.alt.classpath"
+        value="${hadoop.root}/build/classes" />
+
+      <!-- we want more log4j output when running unit tests -->
+      <sysproperty key="hadoop.root.logger"
+        value="DEBUG,console" />
+
+      <!-- requires fork=yes for:
+        relative File paths to use the specified user.dir
+        classpath to use build/contrib/*.jar
+      -->
+      <sysproperty key="user.dir" value="${build.test}/data"/>
+
+      <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+      <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+
+      <!-- Path to SQL setup script. -->
+      <sysproperty key="vertica.test_setup" value="${basedir}/testdata/vertica_test.sql" />
+
+      <!-- Vertica database parameters -->
+      <sysproperty key="vertica.hostname" value="localhost" />
+      <sysproperty key="vertica.database" value="db" />
+      <sysproperty key="vertica.username" value="dbadmin" />
+      <sysproperty key="vertica.password" value="" />
+	
+      <!-- tools.jar from Sun JDK also required to invoke javac. -->
+      <classpath>
+        <path refid="test.classpath"/>
+        <path refid="contrib-classpath"/>
+        <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
+      </classpath>
+      <formatter type="${test.junit.output.format}" />
+      <batchtest todir="${build.test}" unless="testcase">
+        <fileset dir="${src.test}"
+                 includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+      </batchtest>
+      <batchtest todir="${build.test}" if="testcase">
+        <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+      </batchtest>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+  </target>
+
+</project>

Added: hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml Fri Sep 18 18:24:31 2009
@@ -0,0 +1,64 @@
+<?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+    
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Apache Hadoop
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact" />
+
+    <conf name="common" visibility="private"
+      extends="runtime"
+      description="artifacts needed to compile/test the application"/>
+    <conf name="test" visibility="private" extends="runtime"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="common->default"/>
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->default"/>
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="common->default"/>
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="common->master"/>
+  </dependencies>
+</ivy-module>

Added: hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties Fri Sep 18 18:24:31 2009
@@ -0,0 +1,21 @@
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#  
+#       http://www.apache.org/licenses/LICENSE-2.0
+#    
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)
+

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A container for configuration property names for jobs with Vertica
+ * input/output.
+ * 
+ * The job can be configured using the static methods in this class,
+ * {@link VerticaInputFormat}, and {@link VerticaOutputFormat}. Alternatively,
+ * the properties can be set in the configuration with proper values.
+ * 
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ *      String, String)
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ *      String, String, String[], String, String, String)
+ * @see VerticaInputFormat#setInput(Job, String)
+ * @see VerticaInputFormat#setInput(Job, String, Collection<List<Object>>)
+ * @see VerticaInputFormat#setInput(Job, String, String)
+ * @see VerticaInputFormat#setInput(Job, String, String...)
+ * @see VerticaOutputFormat#setOutput(Job, String)
+ * @see VerticaOutputFormat#setOutput(Job, String, Collection<VerticaTable>)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean, String...)
+ */
+public class VerticaConfiguration {
+  /** Class name for Vertica JDBC Driver */
+  public static final String VERTICA_DRIVER_CLASS = "com.vertica.Driver";
+
+  /** Host names to connect to, selected from at random */
+  public static final String HOSTNAMES_PROP = "mapred.vertica.hostnames";
+
+  /** Name of database to connect to */
+  public static final String DATABASE_PROP = "mapred.vertica.database";
+
+  /** User name for Vertica */
+  public static final String USERNAME_PROP = "mapred.vertica.username";
+
+  /** Password for Vertica */
+  public static final String PASSWORD_PROP = "mapred.vertica.password";
+
+  /** Host names to connect to, selected from at random */
+  public static final String OUTPUT_HOSTNAMES_PROP = "mapred.vertica.hostnames.output";
+
+  /** Name of database to connect to */
+  public static final String OUTPUT_DATABASE_PROP = "mapred.vertica.database.output";
+
+  /** User name for Vertica */
+  public static final String OUTPUT_USERNAME_PROP = "mapred.vertica.username.output";
+
+  /** Password for Vertica */
+  public static final String OUTPUT_PASSWORD_PROP = "mapred.vertica.password.output";
+
+  /** Query to run for input */
+  public static final String QUERY_PROP = "mapred.vertica.input.query";
+
+  /** Query to run to retrieve parameters */
+  public static final String QUERY_PARAM_PROP = "mapred.vertica.input.query.paramquery";
+
+  /** Static parameters for query */
+  public static final String QUERY_PARAMS_PROP = "mapred.vertica.input.query.params";
+
+  /** Optional input delimiter for streaming */
+  public static final String INPUT_DELIMITER_PROP = "mapred.vertica.input.delimiter";
+
+  /** Optional input terminator for streaming */
+  public static final String INPUT_TERMINATOR_PROP = "mapred.vertica.input.terminator";
+
+  /** Whether to marshal dates as strings */
+  public static final String DATE_STRING = "mapred.vertica.date_as_string";
+
+  /** Output table name */
+  public static final String OUTPUT_TABLE_NAME_PROP = "mapred.vertica.output.table.name";
+
+  /** Definition of output table types */
+  public static final String OUTPUT_TABLE_DEF_PROP = "mapred.vertica.output.table.def";
+
+  /** Whether to drop tables */
+  public static final String OUTPUT_TABLE_DROP = "mapred.vertica.output.table.drop";
+
+  /** Optional output format delimiter */
+  public static final String OUTPUT_DELIMITER_PROP = "mapred.vertica.output.delimiter";
+
+  /** Optional output format terminator */
+  public static final String OUTPUT_TERMINATOR_PROP = "mapred.vertica.output.terminator";
+
+  /**
+   * Override the sleep timer for optimize to poll when new projetions have
+   * refreshed
+   */
+  public static final String OPTIMIZE_POLL_TIMER_PROP = "mapred.vertica.optimize.poll";
+
+  /**
+   * Sets the Vertica database connection information in the (@link
+   * Configuration)
+   * 
+   * @param conf
+   *          the configuration
+   * @param hostnames
+   *          one or more hosts in the Vertica cluster
+   * @param database
+   *          the name of the Vertica database
+   * @param username
+   *          Vertica database username
+   * @param password
+   *          Vertica database password
+   */
+  public static void configureVertica(Configuration conf, String[] hostnames,
+      String database, String username, String password) {
+    conf.setStrings(HOSTNAMES_PROP, hostnames);
+    conf.set(DATABASE_PROP, database);
+    conf.set(USERNAME_PROP, username);
+    conf.set(PASSWORD_PROP, password);
+  }
+
+  /**
+   * Sets the Vertica database connection information in the (@link
+   * Configuration)
+   * 
+   * @param conf
+   *          the configuration
+   * @param hostnames
+   *          one or more hosts in the source Cluster
+   * @param database
+   *          the name of the source Vertica database
+   * @param username
+   *          for the source Vertica database
+   * @param password
+   *          for he source Vertica database
+   * @param output_hostnames
+   *          one or more hosts in the output Cluster
+   * @param output_database
+   *          the name of the output VerticaDatabase
+   * @param output_username
+   *          for the target Vertica database
+   * @param output_password
+   *          for the target Vertica database
+   */
+  public static void configureVertica(Configuration conf, String[] hostnames,
+      String database, String username, String password,
+      String[] output_hostnames, String output_database,
+      String output_username, String output_password) {
+    configureVertica(conf, hostnames, database, username, password);
+
+    conf.setStrings(OUTPUT_HOSTNAMES_PROP, output_hostnames);
+    conf.set(OUTPUT_DATABASE_PROP, output_database);
+    conf.set(OUTPUT_USERNAME_PROP, output_username);
+    conf.set(OUTPUT_PASSWORD_PROP, output_password);
+  }
+
+  private Configuration conf;
+
+  // default record terminator for writing output to Vertica
+  public static final String RECORD_TERMINATER = "\u0008";
+
+  // default delimiter for writing output to Vertica
+  public static final String DELIMITER = "\u0007";
+
+  // defulat optimize poll timeout
+  public static final int OPTIMIZE_POLL_TIMER = 1000;
+
+  VerticaConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Returns a connection to a random host in the Vertica cluster
+   * 
+   * @param output
+   *          true if the connection is for writing
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws SQLException
+   */
+  Connection getConnection(boolean output) throws IOException,
+      ClassNotFoundException, SQLException {
+    try {
+      Class.forName(VERTICA_DRIVER_CLASS);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    String[] hosts = conf.getStrings(HOSTNAMES_PROP);
+    String user = conf.get(USERNAME_PROP);
+    String pass = conf.get(PASSWORD_PROP);
+    String database = conf.get(DATABASE_PROP);
+
+    if (output) {
+      hosts = conf.getStrings(OUTPUT_HOSTNAMES_PROP, hosts);
+      user = conf.get(OUTPUT_USERNAME_PROP, user);
+      pass = conf.get(OUTPUT_PASSWORD_PROP, pass);
+      database = conf.get(OUTPUT_DATABASE_PROP, database);
+    }
+
+    if (hosts == null)
+      throw new IOException("Vertica requies a hostname defined by "
+          + HOSTNAMES_PROP);
+    if (hosts.length == 0)
+      throw new IOException("Vertica requies a hostname defined by "
+          + HOSTNAMES_PROP);
+    if (database == null)
+      throw new IOException("Vertica requies a database name defined by "
+          + DATABASE_PROP);
+    Random r = new Random();
+    if (user == null)
+      throw new IOException("Vertica requires a username defined by "
+          + USERNAME_PROP);
+    return DriverManager.getConnection("jdbc:vertica://"
+        + hosts[r.nextInt(hosts.length)] + ":5433/" + database, user, pass);
+  }
+
+  public String getInputQuery() {
+    return conf.get(QUERY_PROP);
+  }
+
+  /**
+   * get Run this query and give the results to mappers.
+   * 
+   * @param inputQuery
+   */
+  public void setInputQuery(String inputQuery) {
+    inputQuery = inputQuery.trim();
+    if (inputQuery.endsWith(";")) {
+      inputQuery = inputQuery.substring(0, inputQuery.length() - 1);
+    }
+    conf.set(QUERY_PROP, inputQuery);
+  }
+
+  /**
+   * Return the query used to retrieve parameters for the input query (if set)
+   * 
+   * @return Returns the query for input parameters
+   */
+  public String getParamsQuery() {
+    return conf.get(QUERY_PARAM_PROP);
+  }
+
+  /**
+   * Query used to retrieve parameters for the input query. The result set must
+   * match the input query parameters preceisely.
+   * 
+   * @param segment_params_query
+   */
+  public void setParamsQuery(String segment_params_query) {
+    conf.set(QUERY_PARAM_PROP, segment_params_query);
+  }
+
+  /**
+   * Return static input parameters if set
+   * 
+   * @return Collection of list of objects representing input parameters
+   * @throws IOException
+   */
+  public Collection<List<Object>> getInputParameters() throws IOException {
+    Collection<List<Object>> values = null;
+    String[] query_params = conf.getStrings(QUERY_PARAMS_PROP);
+    if (query_params != null) {
+      values = new ArrayList<List<Object>>();
+      for (String str_params : query_params) {
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(StringUtils.hexStringToByte(str_params), str_params.length());
+        VerticaRecord record = new VerticaRecord();
+        record.readFields(in);
+        values.add(record.getValues());
+      }
+    }
+    return values;
+  }
+
+  /**
+   * Sets a collection of lists. Each list is passed to an input split and used
+   * as arguments to the input query.
+   * 
+   * @param segmentParams
+   * @throws IOException
+   */
+  public void setInputParams(Collection<List<Object>> segment_params)
+      throws IOException {
+    String[] values = new String[segment_params.size()];
+    int i = 0;
+    for (List<Object> params : segment_params) {
+      DataOutputBuffer out = new DataOutputBuffer();
+      VerticaRecord record = new VerticaRecord(params, true);
+      record.write(out);
+      values[i++] = StringUtils.byteToHexString(out.getData());
+    }
+    conf.setStrings(QUERY_PARAMS_PROP, values);
+  }
+
+  /**
+   * For streaming return the delimiter to separate values to the mapper
+   * 
+   * @return Returns delimiter used to format streaming input data
+   */
+  public String getInputDelimiter() {
+    return conf.get(INPUT_DELIMITER_PROP, DELIMITER);
+  }
+
+  /**
+   * For streaming set the delimiter to separate values to the mapper
+   */
+  public void setInputDelimiter(String delimiter) {
+    conf.set(INPUT_DELIMITER_PROP, delimiter);
+  }
+
+  /**
+   * For streaming return the record terminator to separate values to the mapper
+   * 
+   * @return Returns recorder terminator for input data
+   */
+  public String getInputRecordTerminator() {
+    return conf.get(INPUT_TERMINATOR_PROP, RECORD_TERMINATER);
+  }
+
+  /**
+   * For streaming set the record terminator to separate values to the mapper
+   */
+  public void setInputRecordTerminator(String terminator) {
+    conf.set(INPUT_TERMINATOR_PROP, terminator);
+  }
+
+  /**
+   * Get the table that is the target of output
+   * 
+   * @return Returns table name for output
+   */
+  public String getOutputTableName() {
+    return conf.get(OUTPUT_TABLE_NAME_PROP);
+  }
+
+  /**
+   * Set table that is being loaded as output
+   * 
+   * @param tableName
+   */
+  public void setOutputTableName(String tableName) {
+    conf.set(OUTPUT_TABLE_NAME_PROP, tableName);
+  }
+
+  /**
+   * Return definition of columns for output table
+   * 
+   * @return Returns table definition for output table
+   */
+  public String[] getOutputTableDef() {
+    return conf.getStrings(OUTPUT_TABLE_DEF_PROP);
+  }
+
+  /**
+   * Set the definition of a table for output if it needs to be created
+   * 
+   * @param fieldNames
+   */
+  public void setOutputTableDef(String... fieldNames) {
+    conf.setStrings(OUTPUT_TABLE_DEF_PROP, fieldNames);
+  }
+
+  /**
+   * Return whether output table is truncated before loading
+   * 
+   * @return Returns true if output table should be dropped before loading
+   */
+  public boolean getDropTable() {
+    return conf.getBoolean(OUTPUT_TABLE_DROP, false);
+  }
+
+  /**
+   * Set whether to truncate the output table before loading
+   * 
+   * @param drop_table
+   */
+  public void setDropTable(boolean drop_table) {
+    conf.setBoolean(OUTPUT_TABLE_DROP, drop_table);
+  }
+
+  /**
+   * For streaming return the delimiter used by the reducer
+   * 
+   * @return Returns delimiter to use for output data
+   */
+  public String getOutputDelimiter() {
+    return conf.get(OUTPUT_DELIMITER_PROP, DELIMITER);
+  }
+
+  /**
+   * For streaming set the delimiter used by the reducer
+   * 
+   * @param delimiter
+   */
+  public void setOutputDelimiter(String delimiter) {
+    conf.set(OUTPUT_DELIMITER_PROP, delimiter);
+  }
+
+  /**
+   * For streaming return the record terminator used by the reducer
+   * 
+   * @return Returns the record terminator for output data
+   */
+  public String getOutputRecordTerminator() {
+    return conf.get(OUTPUT_TERMINATOR_PROP, RECORD_TERMINATER);
+  }
+
+  /**
+   * For streaming set the record terminator used by the reducer
+   * 
+   * @param terminator
+   */
+  public void setOutputRecordTerminator(String terminator) {
+    conf.set(OUTPUT_TERMINATOR_PROP, terminator);
+  }
+
+  /**
+   * Returns poll timer for optimize loop
+   * 
+   * @return Returns poll timer for optimize loop
+   */
+  public Long getOptimizePollTimeout() {
+    return conf.getLong(OPTIMIZE_POLL_TIMER_PROP, OPTIMIZE_POLL_TIMER);
+  }
+
+  /**
+   * Set the timour for the optimize poll loop
+   * 
+   * @param timeout
+   */
+  public void setOptimizePollTimeout(Long timeout) {
+    conf.setLong(OPTIMIZE_POLL_TIMER_PROP, timeout);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+  
+/**
+ * Input formatter that returns the results of a query executed against Vertica.
+ * The key is a record number within the result set of each mapper The value is
+ * a VerticaRecord, which uses a similar interface to JDBC ResultSets for
+ * returning values.
+ * 
+ */
+public class VerticaInputFormat extends
+    InputFormat<LongWritable, VerticaRecord> {
+
+  /**
+   * Set the input query for a job
+   * 
+   * @param job
+   * @param inputQuery
+   *          query to run against Vertica
+   */
+  public static void setInput(Job job, String inputQuery) {
+    job.setInputFormatClass(VerticaInputFormat.class);
+    VerticaConfiguration config = new VerticaConfiguration(job
+        .getConfiguration());
+    config.setInputQuery(inputQuery);
+  }
+
+  /**
+   * Set a parameterized input query for a job and the query that returns the
+   * parameters.
+   * 
+   * @param job
+   * @param inputQuery
+   *          SQL query that has parameters specified by question marks ("?")
+   * @param segmentParamsQuery
+   *          SQL query that returns parameters for the input query
+   */
+  public static void setInput(Job job, String inputQuery,
+      String segmentParamsQuery) {
+    job.setInputFormatClass(VerticaInputFormat.class);
+    VerticaConfiguration config = new VerticaConfiguration(job
+        .getConfiguration());
+    config.setInputQuery(inputQuery);
+    config.setParamsQuery(segmentParamsQuery);
+  }
+
+  /**
+   * Set the input query and any number of comma delimited literal list of
+   * parameters
+   * 
+   * @param job
+   * @param inputQuery
+   *          SQL query that has parameters specified by question marks ("?")
+   * @param segmentParams
+   *          any numer of comma delimited strings with literal parameters to
+   *          substitute in the input query
+   */
+  @SuppressWarnings("serial")
+  public static void setInput(Job job, String inputQuery,
+      String... segmentParams) throws IOException {
+    // transform each param set into array
+    DateFormat datefmt = DateFormat.getDateInstance();
+    Collection<List<Object>> params = new HashSet<List<Object>>() {
+    };
+    for (String strParams : segmentParams) {
+      List<Object> param = new ArrayList<Object>();
+
+      for (String strParam : strParams.split(",")) {
+        strParam = strParam.trim();
+        if (strParam.charAt(0) == '\''
+            && strParam.charAt(strParam.length() - 1) == '\'')
+          param.add(strParam.substring(1, strParam.length() - 1));
+        else {
+          try {
+            param.add(datefmt.parse(strParam));
+          } catch (ParseException e1) {
+            try {
+              param.add(Integer.parseInt(strParam));
+            } catch (NumberFormatException e2) {
+              throw new IOException("Error parsing argument " + strParam);
+            }
+          }
+        }
+      }
+
+      params.add(param);
+    }
+
+    setInput(job, inputQuery, params);
+  }
+
+  /**
+   * Set the input query and a collection of parameter lists
+   * 
+   * @param job
+   * @param inpuQuery
+   *          SQL query that has parameters specified by question marks ("?")
+   * @param segmentParams
+   *          collection of ordered lists to subtitute into the input query
+   * @throws IOException
+   */
+  public static void setInput(Job job, String inpuQuery,
+      Collection<List<Object>> segmentParams) throws IOException {
+    job.setInputFormatClass(VerticaInputFormat.class);
+    VerticaConfiguration config = new VerticaConfiguration(job
+        .getConfiguration());
+    config.setInputQuery(inpuQuery);
+    config.setInputParams(segmentParams);
+  }
+
+  /** {@inheritDoc} */
+  public RecordReader<LongWritable, VerticaRecord> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+    try {
+      return new VerticaRecordReader((VerticaInputSplit) split, context
+          .getConfiguration());
+    } catch (Exception e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    return VerticaUtil.getSplits(context);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Input split class for reading data from Vertica
+ * 
+ */
+public class VerticaInputSplit extends InputSplit implements Writable {
+  private static final Log LOG = LogFactory.getLog(VerticaInputSplit.class);
+
+  PreparedStatement stmt = null;
+  Connection connection = null;
+  VerticaConfiguration vtconfig = null;
+  String inputQuery = null;
+  List<Object> segmentParams = null;
+  long start = 0;
+  long end = 0;
+
+  /** (@inheritDoc) */
+  public VerticaInputSplit() {
+    LOG.trace("Input split default constructor");
+  }
+
+  /**
+   * Set the input query and a list of parameters to substitute when evaluating
+   * the query
+   * 
+   * @param inputQuery
+   *          SQL query to run
+   * @param segmentParams
+   *          list of parameters to substitute into the query
+   * @param start
+   *          the logical starting record number
+   * @param end
+   *          the logical ending record number
+   */
+  public VerticaInputSplit(String inputQuery, List<Object> segmentParams,
+      long start, long end) {
+    LOG.trace("Input split constructor with query and params");
+    this.inputQuery = inputQuery;
+    this.segmentParams = segmentParams;
+    this.start = start;
+    this.end = end;
+  }
+
+  /** (@inheritDoc) */
+  public void configure(Configuration conf) throws Exception {
+    LOG.trace("Input split configured");
+    vtconfig = new VerticaConfiguration(conf);
+    connection = vtconfig.getConnection(false);
+    connection.setAutoCommit(true);
+    connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+  }
+
+  /**
+   * Return the parameters used for input query
+   * 
+   * @return
+   */
+  public List<Object> getSegmentParams() {
+    return segmentParams;
+  }
+
+  /**
+   * Run the query that, when executed returns input for the mapper
+   * 
+   * @return
+   * @throws Exception
+   */
+  public ResultSet executeQuery() throws Exception {
+    LOG.trace("Input split execute query");
+    long length = getLength();
+
+    if (length != 0)
+      inputQuery = "SELECT * FROM ( " + inputQuery
+          + " ) limited LIMIT ? OFFSET ?";
+
+    if (connection == null)
+      throw new Exception("Cannot execute query with no connection");
+    stmt = connection.prepareStatement(inputQuery);
+
+    int i = 1;
+    if (segmentParams != null)
+      for (Object param : segmentParams)
+        stmt.setObject(i++, param);
+
+    if (length != 0) {
+      stmt.setLong(i++, length);
+      stmt.setLong(i++, start);
+    }
+
+    ResultSet rs = stmt.executeQuery();
+    return rs;
+  }
+
+  /** (@inheritDoc) */
+  public void close() throws SQLException {
+    stmt.close();
+  }
+
+  /**
+   * @return The index of the first row to select
+   */
+  public long getStart() {
+    return start;
+  }
+
+  /**
+   * @return The index of the last row to select
+   */
+  public long getEnd() {
+    return end;
+  }
+
+  /**
+   * @return The total row count in this split
+   */
+  public long getLength() throws IOException {
+    // TODO: figureout how to return length when there is no start and end
+    return end - start;
+  }
+
+  /** {@inheritDoc} */
+  public String[] getLocations() throws IOException {
+    return new String[] {};
+  }
+
+  /** (@inheritDoc) */
+  public Configuration getConfiguration() {
+    return vtconfig.getConfiguration();
+  }
+
+  /** {@inheritDoc} */
+  public void readFields(DataInput in) throws IOException {
+    inputQuery = Text.readString(in);
+    long paramCount = in.readLong();
+    if (paramCount > 0) {
+      VerticaRecord record = new VerticaRecord();
+      record.readFields(in);
+      segmentParams = record.getValues();
+    }
+    start = in.readLong();
+    end = in.readLong();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputQuery);
+    if (segmentParams != null && segmentParams.size() > 0) {
+      out.writeLong(segmentParams.size());
+      VerticaRecord record = new VerticaRecord(segmentParams, true);
+      record.write(out);
+    } else
+      out.writeLong(0);
+    out.writeLong(start);
+    out.writeLong(end);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output formatter for loading reducer output to Vertica
+ * 
+ */
+public class VerticaOutputFormat extends OutputFormat<Text, VerticaRecord> {
+  String delimiter = VerticaConfiguration.DELIMITER;
+  String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+  /**
+   * Set the output table
+   * 
+   * @param conf
+   * @param tableName
+   */
+  public static void setOutput(Job job, String tableName) {
+    setOutput(job, tableName, false);
+  }
+
+  /**
+   * Set the output table and whether to drop it before loading
+   * 
+   * @param job
+   * @param tableName
+   * @param dropTable
+   */
+  public static void setOutput(Job job, String tableName, boolean dropTable) {
+    setOutput(job, tableName, dropTable);
+  }
+
+  /**
+   * Set the output table, whether to drop it before loading and the create
+   * table specification if it doesn't exist
+   * 
+   * @param job
+   * @param tableName
+   * @param dropTable
+   * @param tableDef
+   *          list of column definitions such as "foo int", "bar varchar(10)"
+   */
+  public static void setOutput(Job job, String tableName, boolean dropTable,
+      String... tableDef) {
+    VerticaConfiguration vtconfig = new VerticaConfiguration(job
+        .getConfiguration());
+    vtconfig.setOutputTableName(tableName);
+    vtconfig.setOutputTableDef(tableDef);
+    vtconfig.setDropTable(dropTable);
+  }
+
+  // TODO: handle collection of output tables private class VerticaTable {
+
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context) throws IOException {
+    VerticaUtil.checkOutputSpecs(context.getConfiguration());
+    VerticaConfiguration vtconfig = new VerticaConfiguration(context
+        .getConfiguration());
+    delimiter = vtconfig.getOutputDelimiter();
+    terminator = vtconfig.getOutputRecordTerminator();
+  }
+
+  /**
+   * Test check specs (don't connect to db)
+   * 
+   * @param context
+   * @param test
+   *          true if testing
+   * @throws IOException
+   */
+  public void checkOutputSpecs(JobContext context, boolean test)
+      throws IOException {
+    VerticaUtil.checkOutputSpecs(context.getConfiguration());
+    VerticaConfiguration vtconfig = new VerticaConfiguration(context
+        .getConfiguration());
+    delimiter = vtconfig.getOutputDelimiter();
+    terminator = vtconfig.getOutputRecordTerminator();
+  }
+
+  /** {@inheritDoc} */
+  public RecordWriter<Text, VerticaRecord> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+
+    VerticaConfiguration config = new VerticaConfiguration(context
+        .getConfiguration());
+
+    String name = context.getJobName();
+    // TODO: use explicit date formats
+    String table = config.getOutputTableName();
+    String copyStmt = "COPY " + table + " FROM STDIN" + " DELIMITER '"
+        + delimiter + "' RECORD TERMINATOR '" + terminator + "' STREAM NAME '"
+        + name + "' DIRECT";
+
+    try {
+      Connection conn = config.getConnection(true);
+      return new VerticaRecordWriter(conn, copyStmt, table, delimiter,
+          terminator);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public static VerticaRecord getValue(Configuration conf) throws Exception {
+    VerticaConfiguration config = new VerticaConfiguration(conf);
+    String table = config.getOutputTableName();
+    Connection conn = config.getConnection(true);
+    return (new VerticaRecordWriter(conn, "", table, config
+        .getOutputDelimiter(), config.getOutputRecordTerminator())).getValue();
+  }
+
+  /**
+   * Optionally called at the end of a job to optimize any newly created and
+   * loaded tables. Useful for new tables with more than 100k records.
+   * 
+   * @param conf
+   * @throws Exception
+   */
+  public static void optimize(Configuration conf) throws Exception {
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+    Connection conn = vtconfig.getConnection(true);
+
+    // TODO: consider more tables and skip tables with non-temp projections 
+    String tableName = vtconfig.getOutputTableName();
+    Statement stmt = conn.createStatement();
+    ResultSet rs = null;
+    StringBuffer designTables = new StringBuffer(tableName);
+    HashSet<String> tablesWithTemp = new HashSet<String>();
+
+    //fully qualify the table name - defaults to public.<table>
+    if(tableName.indexOf(".") == -1) {
+      tableName = "public." + tableName;
+    }
+
+    //for now just add the single output table
+    tablesWithTemp.add(tableName);
+
+    // map from table name to set of projection names
+    HashMap<String, Collection<String>> tableProj = new HashMap<String, Collection<String>>();
+    rs = stmt.executeQuery("select schemaname, anchortablename, projname from vt_projection;");
+    while(rs.next()) {
+      String ptable = rs.getString(1) + "." + rs.getString(2);
+      if(!tableProj.containsKey(ptable)) {
+        tableProj.put(ptable, new HashSet<String>());
+      }
+      
+      tableProj.get(ptable).add(rs.getString(3));
+    }
+    
+    for(String table : tablesWithTemp) {
+      if(!tableProj.containsKey(table)) {
+        throw new RuntimeException("Cannot optimize table with no data: " + table);
+      }
+    }
+    
+    String designName = (new Integer(conn.hashCode())).toString();
+    stmt.execute("select create_projection_design('" + designName + "', '', '"
+        + designTables.toString() + "')");
+
+    rs = stmt.executeQuery("select get_design_script('" + designName + "', '"
+        + designName + "')");
+    rs.next();
+    String[] projSet = rs.getString(1).split(";");
+    for (String proj : projSet) {
+      stmt.execute(proj);
+    }
+    stmt.execute("select start_refresh()");
+
+    // pool for refresh complete
+    boolean refreshing = true;
+    Long timeout = vtconfig.getOptimizePollTimeout();
+    while (refreshing) {
+      refreshing = false;
+      rs = stmt
+          .executeQuery("select table_name, projection_name, status from vt_projection_refresh");
+      while (rs.next()) {
+        String table = rs.getString(1);
+        String stat = rs.getString(3);
+        if (stat.equals("refreshing") && tablesWithTemp.contains(table))
+          refreshing = true;
+      }
+
+      Thread.sleep(timeout);
+    }
+
+    // refresh done, move the ahm and drop the temp projections
+    stmt.execute("select make_ahm_now()");
+
+    for (String table : tablesWithTemp) {
+      for (String proj : tableProj.get(table)) {
+        stmt.execute("DROP PROJECTION " + proj);
+      }
+    }
+  }
+
+  /** (@inheritDoc) */
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+        context);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Serializable record for records returned from and written to Vertica
+ * 
+ */
+public class VerticaRecord implements Writable {
+  ResultSet results = null;
+  ResultSetMetaData meta = null;
+  int columns = 0;
+  List<Integer> types = null;
+  List<Object> values = null;
+  List<String> names = null;
+  boolean dateString;
+  String delimiter = VerticaConfiguration.DELIMITER;
+  String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+  DateFormat datefmt = new SimpleDateFormat("yyyyMMdd");
+  DateFormat timefmt = new SimpleDateFormat("HHmmss");
+  DateFormat tmstmpfmt = new SimpleDateFormat("yyyyMMddHHmmss");
+  DateFormat sqlfmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  public List<Object> getValues() {
+    return values;
+  }
+
+  public List<Integer> getTypes() {
+    return types;
+  }
+
+  /**
+   * Create a new VerticaRecord class out of a query result set
+   * 
+   * @param results
+   *          ResultSet returned from running input split query
+   * @param dateString
+   *          True if dates should be marshaled as strings
+   * @throws SQLException
+   */
+  VerticaRecord(ResultSet results, boolean dateString) throws SQLException {
+    this.results = results;
+    this.dateString = dateString;
+    meta = results.getMetaData();
+    columns = meta.getColumnCount();
+    names = new ArrayList<String>(columns);
+    types = new ArrayList<Integer>(columns);
+    values = new ArrayList<Object>(columns);
+    for (int i = 0; i < columns; i++) {
+      names.add(meta.getCatalogName(i + 1));
+      types.add(meta.getColumnType(i + 1));
+      values.add(null);
+    }
+  }
+
+  public VerticaRecord() {
+    this.types = new ArrayList<Integer>();
+    this.values = new ArrayList<Object>();
+  }
+
+  public VerticaRecord(List<String> names, List<Integer> types) {
+    this.names = names;
+    this.types = types;
+    values = new ArrayList<Object>();
+    for (@SuppressWarnings("unused")
+    Integer type : types)
+      values.add(null);
+    columns = values.size();
+  }
+
+  public VerticaRecord(List<Object> values, boolean parseTypes) {
+    this.types = new ArrayList<Integer>();
+    this.values = values;
+    columns = values.size();
+    objectTypes();
+  }
+
+  /**
+   * Test interface for junit tests that do not require a database
+   * 
+   * @param types
+   * @param values
+   * @param dateString
+   */
+  public VerticaRecord(List<String> names, List<Integer> types,
+      List<Object> values, boolean dateString) {
+    this.names = names;
+    this.types = types;
+    this.values = values;
+    this.dateString = dateString;
+    columns = types.size();
+    if (types.size() == 0)
+      objectTypes();
+  }
+
+  public Object get(String name) throws Exception {
+    if (names == null || names.size() == 0)
+      throw new Exception("Cannot set record by name if names not initialized");
+    int i = names.indexOf(name);
+    return get(i);
+  }
+
+  public Object get(int i) {
+    if (i >= values.size())
+      throw new IndexOutOfBoundsException("Index " + i
+          + " greater than input size " + values.size());
+    return values.get(i);
+  }
+
+  public void set(String name, Object value) throws Exception {
+    if (names == null || names.size() == 0)
+      throw new Exception("Cannot set record by name if names not initialized");
+    int i = names.indexOf(name);
+    set(i, value);
+  }
+
+  /**
+   * set a value, 0 indexed
+   * 
+   * @param i
+   */
+  public void set(Integer i, Object value) {
+    set(i, value, false);
+  }
+
+  /**
+   * set a value, 0 indexed
+   * 
+   * @param i
+   */
+  public void set(Integer i, Object value, boolean validate) {
+    if (i >= values.size())
+      throw new IndexOutOfBoundsException("Index " + i
+          + " greater than input size " + values.size());
+    if (validate) {
+      Integer type = types.get(i);
+      switch (type) {
+      case Types.BIGINT:
+        if (!(value instanceof Long) && !(value instanceof Integer)
+            && !(value instanceof Short) && !(value instanceof LongWritable)
+            && !(value instanceof VLongWritable)
+            && !(value instanceof VIntWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Long");
+        break;
+      case Types.INTEGER:
+        if (!(value instanceof Integer) && !(value instanceof Short)
+            && !(value instanceof VIntWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Integer");
+        break;
+      case Types.TINYINT:
+      case Types.SMALLINT:
+        if (!(value instanceof Short))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Short");
+        break;
+      case Types.REAL:
+      case Types.DECIMAL:
+      case Types.NUMERIC:
+      case Types.DOUBLE:
+        if (!(value instanceof Double) && !(value instanceof Float)
+            && !(value instanceof DoubleWritable)
+            && !(value instanceof FloatWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Double");
+        break;
+      case Types.FLOAT:
+        if (!(value instanceof Float) && !(value instanceof FloatWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Float");
+        break;
+      case Types.BINARY:
+      case Types.LONGVARBINARY:
+      case Types.VARBINARY:
+        if (!(value instanceof byte[]) && !(value instanceof BytesWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to byte[]");
+        break;
+      case Types.BIT:
+      case Types.BOOLEAN:
+        if (!(value instanceof Boolean) && !(value instanceof BooleanWritable)
+            && !(value instanceof ByteWritable))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Boolean");
+        break;
+      case Types.CHAR:
+        if (!(value instanceof Character) && !(value instanceof String))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Character");
+        break;
+      case Types.LONGNVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.NCHAR:
+      case Types.NVARCHAR:
+      case Types.VARCHAR:
+        if (!(value instanceof String) && !(value instanceof Text))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to String");
+        break;
+      case Types.DATE:
+        if (!(value instanceof Date) && !(value instanceof java.util.Date))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Date");
+        break;
+      case Types.TIME:
+        if (!(value instanceof Time) && !(value instanceof java.util.Date))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Time");
+        break;
+      case Types.TIMESTAMP:
+        if (!(value instanceof Timestamp) && !(value instanceof java.util.Date))
+          throw new ClassCastException("Cannot cast "
+              + value.getClass().getName() + " to Timestamp");
+        break;
+      default:
+        throw new RuntimeException("Unknown type value " + types.get(i));
+      }
+    }
+    values.set(i, value);
+  }
+
+  public boolean next() throws SQLException {
+    if (results.next()) {
+      for (int i = 1; i <= columns; i++)
+        values.set(i - 1, results.getObject(i));
+      return true;
+    }
+    return false;
+  }
+
+  private void objectTypes() {
+    for (Object obj : values) {
+      if (obj == null) {
+        this.types.add(null);
+      } else if (obj instanceof Long) {
+        this.types.add(Types.BIGINT);
+      } else if (obj instanceof LongWritable) {
+        this.types.add(Types.BIGINT);
+      } else if (obj instanceof VLongWritable) {
+        this.types.add(Types.BIGINT);
+      } else if (obj instanceof VIntWritable) {
+        this.types.add(Types.INTEGER);
+      } else if (obj instanceof Integer) {
+        this.types.add(Types.INTEGER);
+      } else if (obj instanceof Short) {
+        this.types.add(Types.SMALLINT);
+      } else if (obj instanceof DoubleWritable) {
+        this.types.add(Types.DOUBLE);
+      } else if (obj instanceof Double) {
+        this.types.add(Types.DOUBLE);
+      } else if (obj instanceof Float) {
+        this.types.add(Types.FLOAT);
+      } else if (obj instanceof FloatWritable) {
+        this.types.add(Types.FLOAT);
+      } else if (obj instanceof byte[]) {
+        this.types.add(Types.BINARY);
+      } else if (obj instanceof ByteWritable) {
+        this.types.add(Types.BINARY);
+      } else if (obj instanceof Boolean) {
+        this.types.add(Types.BOOLEAN);
+      } else if (obj instanceof BooleanWritable) {
+        this.types.add(Types.BOOLEAN);
+      } else if (obj instanceof Character) {
+        this.types.add(Types.CHAR);
+      } else if (obj instanceof String) {
+        this.types.add(Types.VARCHAR);
+      } else if (obj instanceof BytesWritable) {
+        this.types.add(Types.VARCHAR);
+      } else if (obj instanceof Text) {
+        this.types.add(Types.VARCHAR);
+      } else if (obj instanceof java.util.Date) {
+        this.types.add(Types.DATE);
+      } else if (obj instanceof Date) {
+        this.types.add(Types.DATE);
+      } else if (obj instanceof Time) {
+        this.types.add(Types.TIME);
+      } else if (obj instanceof Timestamp) {
+        this.types.add(Types.TIMESTAMP);
+      } else {
+        throw new RuntimeException("Unknown type " + obj.getClass().getName()
+            + " passed to Vertica Record");
+      }
+    }
+  }
+
+  public String toSQLString() {
+    return toSQLString(delimiter, terminator);
+  }
+
+  public String toSQLString(String delimiterArg, String terminatorArg) {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < columns; i++) {
+      Object obj = values.get(i);
+      Integer type = types.get(i);
+
+      // switch statement uses fall through to handle type variations
+      // e.g. type specified as BIGINT but passed in as Integer
+      switch (type) {
+      case Types.BIGINT:
+        if (obj instanceof Long) {
+          sb.append(((Long) obj).toString());
+          break;
+        }
+      case Types.INTEGER:
+        if (obj instanceof Integer) {
+          sb.append(((Integer) obj).toString());
+          break;
+        }
+      case Types.TINYINT:
+      case Types.SMALLINT:
+        if (obj instanceof Short) {
+          sb.append(((Short) obj).toString());
+          break;
+        }
+        if (obj instanceof LongWritable) {
+          sb.append(((LongWritable) obj).get());
+          break;
+        }
+        if (obj instanceof VLongWritable) {
+          sb.append(((VLongWritable) obj).get());
+          break;
+        }
+        if (obj instanceof VIntWritable) {
+          sb.append(((VIntWritable) obj).get());
+          break;
+        }
+      case Types.REAL:
+      case Types.DECIMAL:
+      case Types.NUMERIC:
+      case Types.DOUBLE:
+        if (obj instanceof Double) {
+          sb.append(((Double) obj).toString());
+          break;
+        }
+        if (obj instanceof DoubleWritable) {
+          sb.append(((DoubleWritable) obj).toString());
+          break;
+        }
+      case Types.FLOAT:
+        if (obj instanceof Float) {
+          sb.append(((Float) obj).toString());
+          break;
+        }
+        if (obj instanceof FloatWritable) {
+          sb.append(((FloatWritable) obj).get());
+          break;
+        }
+      case Types.BINARY:
+      case Types.LONGVARBINARY:
+      case Types.VARBINARY:
+        sb.append(ByteBuffer.wrap((byte[]) obj).asCharBuffer());
+        break;
+      case Types.BIT:
+      case Types.BOOLEAN:
+        if (obj instanceof Boolean) {
+          if ((Boolean) obj)
+            sb.append("true");
+          else
+            sb.append("false");
+          break;
+        }
+        if (obj instanceof BooleanWritable) {
+          if (((BooleanWritable) obj).get())
+            sb.append("true");
+          else
+            sb.append("false");
+          break;
+        }
+      case Types.LONGNVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.NCHAR:
+      case Types.NVARCHAR:
+      case Types.VARCHAR:
+        if (obj instanceof String) {
+          sb.append((String) obj);
+          break;
+        }
+        if (obj instanceof byte[]) {
+          sb.append((byte[]) obj);
+          break;
+        }
+        if (obj instanceof BytesWritable) {
+          sb.append(((BytesWritable) obj).getBytes());
+          break;
+        }
+      case Types.CHAR:
+        if (obj instanceof Character) {
+          sb.append((Character) obj);
+          break;
+        }
+        if (obj instanceof ByteWritable) {
+          sb.append(((ByteWritable) obj).get());
+          break;
+        }
+      case Types.DATE:
+      case Types.TIME:
+      case Types.TIMESTAMP:
+        if (obj instanceof java.util.Date)
+          sb.append(sqlfmt.format((java.util.Date) obj));
+        else if (obj instanceof Date)
+          sb.append(sqlfmt.format((Date) obj));
+        else if (obj instanceof Time)
+          sb.append(sqlfmt.format((Time) obj));
+        else if (obj instanceof Timestamp)
+          sb.append(sqlfmt.format((Timestamp) obj));
+        break;
+      default:
+        throw new RuntimeException("Unknown type value " + types.get(i));
+      }
+      if (i < columns - 1)
+        sb.append(delimiterArg);
+      else
+        sb.append(terminatorArg);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    columns = in.readInt();
+    if (types.size() > 0)
+      types.clear();
+    for (int i = 0; i < columns; i++)
+      types.add(in.readInt());
+
+    for (int i = 0; i < columns; i++) {
+      int type = types.get(i);
+      switch (type) {
+      case Types.BIGINT:
+        values.add(in.readLong());
+        break;
+      case Types.INTEGER:
+        values.add(in.readInt());
+        break;
+      case Types.TINYINT:
+      case Types.SMALLINT:
+        values.add(in.readShort());
+        break;
+      case Types.REAL:
+      case Types.DECIMAL:
+      case Types.NUMERIC:
+      case Types.DOUBLE:
+        values.add(in.readDouble());
+        break;
+      case Types.FLOAT:
+        values.add(in.readFloat());
+        break;
+      case Types.BINARY:
+      case Types.LONGVARBINARY:
+      case Types.VARBINARY:
+        values.add(StringUtils.hexStringToByte(Text.readString(in)));
+        break;
+      case Types.BIT:
+      case Types.BOOLEAN:
+        values.add(in.readBoolean());
+        break;
+      case Types.CHAR:
+        values.add(in.readChar());
+        break;
+      case Types.LONGNVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.NCHAR:
+      case Types.NVARCHAR:
+      case Types.VARCHAR:
+        values.add(Text.readString(in));
+        break;
+      case Types.DATE:
+        if (dateString)
+          try {
+            values.add(new Date(datefmt.parse(Text.readString(in)).getTime()));
+          } catch (ParseException e) {
+            throw new IOException(e);
+          }
+        else
+          values.add(new Date(in.readLong()));
+        break;
+      case Types.TIME:
+        if (dateString)
+          try {
+            values.add(new Time(timefmt.parse(Text.readString(in)).getTime()));
+          } catch (ParseException e) {
+            throw new IOException(e);
+          }
+        else
+          values.add(new Time(in.readLong()));
+        break;
+      case Types.TIMESTAMP:
+        if (dateString)
+          try {
+            values.add(new Timestamp(tmstmpfmt.parse(Text.readString(in))
+                .getTime()));
+          } catch (ParseException e) {
+            throw new IOException(e);
+          }
+        else
+          values.add(new Timestamp(in.readLong()));
+        break;
+      default:
+        throw new IOException("Unknown type value " + type);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(columns);
+    for (Integer type : types)
+      out.writeInt(type);
+
+    for (int i = 0; i < columns; i++) {
+      Object obj = values.get(i);
+      Integer type = types.get(i);
+
+      switch (type) {
+      case Types.BIGINT:
+        out.writeLong((Long) obj);
+        break;
+      case Types.INTEGER:
+        out.writeInt((Integer) obj);
+        break;
+      case Types.TINYINT:
+      case Types.SMALLINT:
+        out.writeShort((Short) obj);
+        break;
+      case Types.REAL:
+      case Types.DECIMAL:
+      case Types.NUMERIC:
+      case Types.DOUBLE:
+        out.writeDouble((Double) obj);
+        break;
+      case Types.FLOAT:
+        out.writeFloat((Float) obj);
+        break;
+      case Types.BINARY:
+      case Types.LONGVARBINARY:
+      case Types.VARBINARY:
+        Text.writeString(out, StringUtils.byteToHexString((byte[]) obj));
+        break;
+      case Types.BIT:
+      case Types.BOOLEAN:
+        out.writeBoolean((Boolean) obj);
+        break;
+      case Types.CHAR:
+        out.writeChar((Character) obj);
+        break;
+      case Types.LONGNVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.NCHAR:
+      case Types.NVARCHAR:
+      case Types.VARCHAR:
+        Text.writeString(out, (String) obj);
+        break;
+      case Types.DATE:
+        if (obj instanceof java.util.Date) {
+          if (dateString)
+            Text.writeString(out, datefmt.format((java.util.Date) obj));
+          else
+            out.writeLong(((java.util.Date) obj).getTime());
+        } else {
+          if (dateString)
+            Text.writeString(out, datefmt.format((Date) obj));
+          else
+            out.writeLong(((Date) obj).getTime());
+        }
+        break;
+      case Types.TIME:
+        if (dateString)
+          Text.writeString(out, timefmt.format((Time) obj));
+        else
+          out.writeLong(((Time) obj).getTime());
+        break;
+      case Types.TIMESTAMP:
+        if (dateString)
+          Text.writeString(out, tmstmpfmt.format((Timestamp) obj));
+        else
+          out.writeLong(((Timestamp) obj).getTime());
+        break;
+      default:
+        throw new IOException("Unknown type value " + types.get(i));
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaRecordReader extends
+    RecordReader<LongWritable, VerticaRecord> {
+  ResultSet results = null;
+  long start = 0;
+  int pos = 0;
+  long length = 0;
+  VerticaInputSplit split = null;
+  LongWritable key = null;
+  VerticaRecord value = null;
+
+  public VerticaRecordReader(VerticaInputSplit split, Configuration job)
+      throws Exception {
+    // run query for this segment
+    this.split = split;
+    split.configure(job);
+    start = split.getStart();
+    length = split.getLength();
+    results = split.executeQuery();
+  }
+
+  /** {@inheritDoc} */
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    key = new LongWritable();
+    try {
+      pos++;
+      value = new VerticaRecord(results, false);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void close() throws IOException {
+    try {
+      split.close();
+    } catch (SQLException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+  /** {@inheritDoc} */
+  public float getProgress() throws IOException {
+    // TODO: figure out why length would be 0
+    if (length == 0)
+      return 1;
+    return pos / length;
+  }
+
+  /** {@inheritDoc} */
+  public boolean next(LongWritable key, VerticaRecord value) throws IOException {
+    key.set(pos + start);
+    pos++;
+    try {
+      return value.next();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public LongWritable getCurrentKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  @Override
+  public VerticaRecord getCurrentValue() throws IOException,
+      InterruptedException {
+    return value;
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    key.set(pos + start);
+    pos++;
+    try {
+      return value.next();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaRecordWriter extends RecordWriter<Text, VerticaRecord> {
+  String writerTable = null;
+  Connection connection = null;
+  Statement statement = null; // com.vertica.PGStatement
+  String copyStmt = null;
+  String delimiter = VerticaConfiguration.DELIMITER;
+  String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+  // Methods from com.vertica.PGStatement
+  Method startCopyIn = null;
+  Method finishCopyIn = null;
+  Method addStreamToCopyIn = null;
+
+  public VerticaRecordWriter(Connection connection, String copyStmt,
+      String writerTable, String delimiter, String terminator) {
+    this.connection = connection;
+    this.copyStmt = copyStmt;
+    this.writerTable = writerTable;
+    this.delimiter = delimiter;
+    this.terminator = terminator;
+
+    try {
+      startCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "startCopyIn", String.class, InputStream.class);
+      finishCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "finishCopyIn");
+      addStreamToCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+          "addStreamToCopyIn", InputStream.class);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Vertica Formatter requies a the Vertica jdbc driver");
+    }
+  }
+
+  public VerticaRecord getValue() throws SQLException {
+    DatabaseMetaData dbmd = connection.getMetaData();
+
+    String schema = null;
+    String table = null;
+    String[] schemaTable = writerTable.split("\\.");
+    if (schemaTable.length == 2) {
+      schema = schemaTable[0];
+      table = schemaTable[1];
+    } else if (schemaTable.length == 1) {
+      table = schemaTable[0];
+    } else {
+      throw new RuntimeException(
+          "Vertica Formatter requires a value output table");
+    }
+
+    List<Integer> types = new ArrayList<Integer>();
+    List<String> names = new ArrayList<String>();
+    ResultSet rs = dbmd.getColumns(null, schema, table, null);
+    while (rs.next()) {
+      names.add(rs.getString(4));
+      types.add(rs.getInt(5));
+    }
+
+    VerticaRecord record = new VerticaRecord(names, types);
+    return record;
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException {
+    try {
+      if (statement != null)
+        finishCopyIn.invoke(statement); // statement.finishCopyIn();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void write(Text table, VerticaRecord record) throws IOException {
+    if (!table.toString().equals(writerTable))
+      throw new IOException("Writing to different table " + table.toString()
+          + ". Expecting " + writerTable);
+
+    String strRecord = record.toSQLString(delimiter, terminator);
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(strRecord.getBytes());
+    try {
+      if (statement == null) {
+        statement = connection.createStatement();
+        startCopyIn.invoke(statement, copyStmt, bais); // statement.startCopyIn(copyStmt,
+        // bais);
+      } else
+        addStreamToCopyIn.invoke(statement, bais); // statement.addStreamToCopyIn(bais);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaStreamingInput extends InputFormat<Text, Text> {
+
+  public RecordReader<Text, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    try {
+      return new VerticaStreamingRecordReader((VerticaInputSplit) split,
+          context.getConfiguration());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    return VerticaUtil.getSplits(context);
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class VerticaStreamingOutput extends OutputFormat<Text, Text> {
+  private static final Log LOG = LogFactory
+      .getLog(VerticaStreamingOutput.class);
+
+  String delimiter = VerticaConfiguration.DELIMITER;
+  String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+  public void checkOutputSpecs(JobContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    VerticaUtil.checkOutputSpecs(conf);
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+    delimiter = vtconfig.getOutputDelimiter();
+    terminator = vtconfig.getOutputRecordTerminator();
+    LOG.debug("Vertica output using delimiter '" + delimiter
+        + "' and terminator '" + terminator + "'");
+  }
+
+  public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    Configuration conf = context.getConfiguration();
+    VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+
+    String name = context.getJobName();
+    delimiter = vtconfig.getOutputDelimiter();
+    terminator = vtconfig.getOutputRecordTerminator();
+
+    // TODO: use explicit date formats
+    String table = vtconfig.getOutputTableName();
+    String copyStmt = "COPY " + table + " FROM STDIN" + " DELIMITER '"
+        + delimiter + "' RECORD TERMINATOR '" + terminator + "' STREAM NAME '"
+        + name + "' DIRECT";
+
+    try {
+      Connection conn = vtconfig.getConnection(true);
+      return new VerticaStreamingRecordWriter(conn, copyStmt, table);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+        context);
+  }
+}



Mime
View raw message