Author: ddas
Date: Fri Sep 18 18:24:31 2009
New Revision: 816735
URL: http://svn.apache.org/viewvc?rev=816735&view=rev
Log:
MAPREDUCE-775. Add native and streaming support for Vertica as an input or output format taking advantage of parallel read and write properties of the DBMS. Contributed by Omer Trajman.
Added:
hadoop/mapreduce/trunk/src/contrib/vertica/
hadoop/mapreduce/trunk/src/contrib/vertica/build.xml
hadoop/mapreduce/trunk/src/contrib/vertica/ivy/
hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml
hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties
hadoop/mapreduce/trunk/src/contrib/vertica/src/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordReader.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingRecordWriter.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaUtil.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/AllTests.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestExample.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/TestVertica.java
hadoop/mapreduce/trunk/src/contrib/vertica/src/test/org/apache/hadoop/vertica/VerticaTestCase.java
hadoop/mapreduce/trunk/src/contrib/vertica/testdata/
hadoop/mapreduce/trunk/src/contrib/vertica/testdata/vertica_test.sql
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/build.xml
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816735&r1=816734&r2=816735&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 18:24:31 2009
@@ -114,6 +114,10 @@
MAPREDUCE-777. Brand new apis to track and query jobs as a
replacement for JobClient. (Amareshwari Sriramadasu via acmurthy)
+ MAPREDUCE-775. Add native and streaming support for Vertica as an input
+ or output format taking advantage of parallel read and write properties of
+ the DBMS. (Omer Trajman via ddas)
+
IMPROVEMENTS
MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.
Modified: hadoop/mapreduce/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/build.xml?rev=816735&r1=816734&r2=816735&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/build.xml Fri Sep 18 18:24:31 2009
@@ -57,6 +57,7 @@
<fileset dir="." includes="mrunit/build.xml"/>
<fileset dir="." includes="dynamic-scheduler/build.xml"/>
<fileset dir="." includes="gridmix/build.xml"/>
+ <fileset dir="." includes="vertica/build.xml"/>
</subant>
<available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
<fail if="testsfailed">Tests failed!</fail>
Added: hadoop/mapreduce/trunk/src/contrib/vertica/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/build.xml?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/build.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/build.xml Fri Sep 18 18:24:31 2009
@@ -0,0 +1,104 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="vertica" default="jar">
+
+ <import file="../build-contrib.xml"/>
+ <property environment="env"/>
+
+ <!-- ================================================================== -->
+ <!-- Run unit tests -->
+ <!-- Override with our own version so we can set hadoop.alt.classpath -->
+ <!-- and Hadoop logger properties -->
+ <!-- ================================================================== -->
+ <target name="test" depends="compile-test, compile" if="test.available">
+ <echo message="contrib: ${name}"/>
+ <delete dir="${hadoop.log.dir}"/>
+ <mkdir dir="${hadoop.log.dir}"/>
+ <delete dir="${build.test}/data"/>
+ <mkdir dir="${build.test}/data" />
+ <junit
+ printsummary="yes" showoutput="${test.output}"
+ haltonfailure="no" fork="yes" maxmemory="256m"
+ errorProperty="tests.failed" failureProperty="tests.failed"
+ timeout="${test.timeout}"
+ dir="${build.test}/data">
+
+ <!-- uncomment this if you want to attach a debugger -->
+ <!--
+ <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=2601" />
+ -->
+
+ <sysproperty key="test.build.data" value="${build.test}/data"/>
+ <sysproperty key="build.test" value="${build.test}"/>
+ <sysproperty key="contrib.name" value="${name}"/>
+
+ <!--
+ Added property needed to use the .class files for compilation
+ instead of depending on hadoop-*-core.jar
+ -->
+ <sysproperty key="hadoop.alt.classpath"
+ value="${hadoop.root}/build/classes" />
+
+ <!-- we want more log4j output when running unit tests -->
+ <sysproperty key="hadoop.root.logger"
+ value="DEBUG,console" />
+
+ <!-- requires fork=yes for:
+ relative File paths to use the specified user.dir
+ classpath to use build/contrib/*.jar
+ -->
+ <sysproperty key="user.dir" value="${build.test}/data"/>
+
+ <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+ <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+ <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+
+ <!-- Path to SQL setup script. -->
+ <sysproperty key="vertica.test_setup" value="${basedir}/testdata/vertica_test.sql" />
+
+ <!-- Vertica database parameters -->
+ <sysproperty key="vertica.hostname" value="localhost" />
+ <sysproperty key="vertica.database" value="db" />
+ <sysproperty key="vertica.username" value="dbadmin" />
+ <sysproperty key="vertica.password" value="" />
+
+ <!-- tools.jar from Sun JDK also required to invoke javac. -->
+ <classpath>
+ <path refid="test.classpath"/>
+ <path refid="contrib-classpath"/>
+ <pathelement path="${env.JAVA_HOME}/lib/tools.jar" />
+ </classpath>
+ <formatter type="${test.junit.output.format}" />
+ <batchtest todir="${build.test}" unless="testcase">
+ <fileset dir="${src.test}"
+ includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+ </target>
+
+</project>
Added: hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/ivy.xml Fri Sep 18 18:24:31 2009
@@ -0,0 +1,64 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<ivy-module version="1.0">
+ <info organisation="org.apache.hadoop" module="${ant.project.name}">
+ <license name="Apache 2.0"/>
+ <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+ <description>
+ Apache Hadoop
+ </description>
+ </info>
+ <configurations defaultconfmapping="default">
+ <!--these match the Maven configurations-->
+ <conf name="default" extends="master,runtime"/>
+ <conf name="master" description="contains the artifact but no dependencies"/>
+ <conf name="runtime" description="runtime but not the artifact" />
+
+ <conf name="common" visibility="private"
+ extends="runtime"
+ description="artifacts needed to compile/test the application"/>
+ <conf name="test" visibility="private" extends="runtime"/>
+ </configurations>
+
+ <publications>
+ <!--get the artifact from our module name-->
+ <artifact conf="master"/>
+ </publications>
+ <dependencies>
+ <dependency org="commons-logging"
+ name="commons-logging"
+ rev="${commons-logging.version}"
+ conf="common->default"/>
+ <dependency org="commons-httpclient"
+ name="commons-httpclient"
+ rev="${commons-httpclient.version}"
+ conf="common->default"/>
+ <dependency org="commons-cli"
+ name="commons-cli"
+ rev="${commons-cli.version}"
+ conf="common->default"/>
+ <dependency org="junit"
+ name="junit"
+ rev="${junit.version}"
+ conf="common->default"/>
+ <dependency org="log4j"
+ name="log4j"
+ rev="${log4j.version}"
+ conf="common->master"/>
+ </dependencies>
+</ivy-module>
Added: hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/ivy/libraries.properties Fri Sep 18 18:24:31 2009
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)
+
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaConfiguration.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A container for configuration property names for jobs with Vertica
+ * input/output.
+ *
+ * The job can be configured using the static methods in this class,
+ * {@link VerticaInputFormat}, and {@link VerticaOutputFormat}. Alternatively,
+ * the properties can be set in the configuration with proper values.
+ *
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ * String, String)
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ * String, String, String[], String, String, String)
+ * @see VerticaInputFormat#setInput(Job, String)
+ * @see VerticaInputFormat#setInput(Job, String, Collection<List<Object>>)
+ * @see VerticaInputFormat#setInput(Job, String, String)
+ * @see VerticaInputFormat#setInput(Job, String, String...)
+ * @see VerticaOutputFormat#setOutput(Job, String)
+ * @see VerticaOutputFormat#setOutput(Job, String, Collection<VerticaTable>)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean, String...)
+ */
+public class VerticaConfiguration {
+ /** Class name for Vertica JDBC Driver */
+ public static final String VERTICA_DRIVER_CLASS = "com.vertica.Driver";
+
+ /** Host names to connect to, selected from at random */
+ public static final String HOSTNAMES_PROP = "mapred.vertica.hostnames";
+
+ /** Name of database to connect to */
+ public static final String DATABASE_PROP = "mapred.vertica.database";
+
+ /** User name for Vertica */
+ public static final String USERNAME_PROP = "mapred.vertica.username";
+
+ /** Password for Vertica */
+ public static final String PASSWORD_PROP = "mapred.vertica.password";
+
+ /** Host names to connect to, selected from at random */
+ public static final String OUTPUT_HOSTNAMES_PROP = "mapred.vertica.hostnames.output";
+
+ /** Name of database to connect to */
+ public static final String OUTPUT_DATABASE_PROP = "mapred.vertica.database.output";
+
+ /** User name for Vertica */
+ public static final String OUTPUT_USERNAME_PROP = "mapred.vertica.username.output";
+
+ /** Password for Vertica */
+ public static final String OUTPUT_PASSWORD_PROP = "mapred.vertica.password.output";
+
+ /** Query to run for input */
+ public static final String QUERY_PROP = "mapred.vertica.input.query";
+
+ /** Query to run to retrieve parameters */
+ public static final String QUERY_PARAM_PROP = "mapred.vertica.input.query.paramquery";
+
+ /** Static parameters for query */
+ public static final String QUERY_PARAMS_PROP = "mapred.vertica.input.query.params";
+
+ /** Optional input delimiter for streaming */
+ public static final String INPUT_DELIMITER_PROP = "mapred.vertica.input.delimiter";
+
+ /** Optional input terminator for streaming */
+ public static final String INPUT_TERMINATOR_PROP = "mapred.vertica.input.terminator";
+
+ /** Whether to marshal dates as strings */
+ public static final String DATE_STRING = "mapred.vertica.date_as_string";
+
+ /** Output table name */
+ public static final String OUTPUT_TABLE_NAME_PROP = "mapred.vertica.output.table.name";
+
+ /** Definition of output table types */
+ public static final String OUTPUT_TABLE_DEF_PROP = "mapred.vertica.output.table.def";
+
+ /** Whether to drop tables */
+ public static final String OUTPUT_TABLE_DROP = "mapred.vertica.output.table.drop";
+
+ /** Optional output format delimiter */
+ public static final String OUTPUT_DELIMITER_PROP = "mapred.vertica.output.delimiter";
+
+ /** Optional output format terminator */
+ public static final String OUTPUT_TERMINATOR_PROP = "mapred.vertica.output.terminator";
+
+ /**
+ * Override the sleep timer for optimize to poll when new projetions have
+ * refreshed
+ */
+ public static final String OPTIMIZE_POLL_TIMER_PROP = "mapred.vertica.optimize.poll";
+
+ /**
+ * Sets the Vertica database connection information in the (@link
+ * Configuration)
+ *
+ * @param conf
+ * the configuration
+ * @param hostnames
+ * one or more hosts in the Vertica cluster
+ * @param database
+ * the name of the Vertica database
+ * @param username
+ * Vertica database username
+ * @param password
+ * Vertica database password
+ */
+ public static void configureVertica(Configuration conf, String[] hostnames,
+ String database, String username, String password) {
+ conf.setStrings(HOSTNAMES_PROP, hostnames);
+ conf.set(DATABASE_PROP, database);
+ conf.set(USERNAME_PROP, username);
+ conf.set(PASSWORD_PROP, password);
+ }
+
+ /**
+ * Sets the Vertica database connection information in the (@link
+ * Configuration)
+ *
+ * @param conf
+ * the configuration
+ * @param hostnames
+ * one or more hosts in the source Cluster
+ * @param database
+ * the name of the source Vertica database
+ * @param username
+ * for the source Vertica database
+ * @param password
+ * for he source Vertica database
+ * @param output_hostnames
+ * one or more hosts in the output Cluster
+ * @param output_database
+ * the name of the output VerticaDatabase
+ * @param output_username
+ * for the target Vertica database
+ * @param output_password
+ * for the target Vertica database
+ */
+ public static void configureVertica(Configuration conf, String[] hostnames,
+ String database, String username, String password,
+ String[] output_hostnames, String output_database,
+ String output_username, String output_password) {
+ configureVertica(conf, hostnames, database, username, password);
+
+ conf.setStrings(OUTPUT_HOSTNAMES_PROP, output_hostnames);
+ conf.set(OUTPUT_DATABASE_PROP, output_database);
+ conf.set(OUTPUT_USERNAME_PROP, output_username);
+ conf.set(OUTPUT_PASSWORD_PROP, output_password);
+ }
+
+ private Configuration conf;
+
+ // default record terminator for writing output to Vertica
+ public static final String RECORD_TERMINATER = "\u0008";
+
+ // default delimiter for writing output to Vertica
+ public static final String DELIMITER = "\u0007";
+
+ // defulat optimize poll timeout
+ public static final int OPTIMIZE_POLL_TIMER = 1000;
+
+ VerticaConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Returns a connection to a random host in the Vertica cluster
+ *
+ * @param output
+ * true if the connection is for writing
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws SQLException
+ */
+ Connection getConnection(boolean output) throws IOException,
+ ClassNotFoundException, SQLException {
+ try {
+ Class.forName(VERTICA_DRIVER_CLASS);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ String[] hosts = conf.getStrings(HOSTNAMES_PROP);
+ String user = conf.get(USERNAME_PROP);
+ String pass = conf.get(PASSWORD_PROP);
+ String database = conf.get(DATABASE_PROP);
+
+ if (output) {
+ hosts = conf.getStrings(OUTPUT_HOSTNAMES_PROP, hosts);
+ user = conf.get(OUTPUT_USERNAME_PROP, user);
+ pass = conf.get(OUTPUT_PASSWORD_PROP, pass);
+ database = conf.get(OUTPUT_DATABASE_PROP, database);
+ }
+
+ if (hosts == null)
+ throw new IOException("Vertica requies a hostname defined by "
+ + HOSTNAMES_PROP);
+ if (hosts.length == 0)
+ throw new IOException("Vertica requies a hostname defined by "
+ + HOSTNAMES_PROP);
+ if (database == null)
+ throw new IOException("Vertica requies a database name defined by "
+ + DATABASE_PROP);
+ Random r = new Random();
+ if (user == null)
+ throw new IOException("Vertica requires a username defined by "
+ + USERNAME_PROP);
+ return DriverManager.getConnection("jdbc:vertica://"
+ + hosts[r.nextInt(hosts.length)] + ":5433/" + database, user, pass);
+ }
+
+ public String getInputQuery() {
+ return conf.get(QUERY_PROP);
+ }
+
+ /**
+ * get Run this query and give the results to mappers.
+ *
+ * @param inputQuery
+ */
+ public void setInputQuery(String inputQuery) {
+ inputQuery = inputQuery.trim();
+ if (inputQuery.endsWith(";")) {
+ inputQuery = inputQuery.substring(0, inputQuery.length() - 1);
+ }
+ conf.set(QUERY_PROP, inputQuery);
+ }
+
+ /**
+ * Return the query used to retrieve parameters for the input query (if set)
+ *
+ * @return Returns the query for input parameters
+ */
+ public String getParamsQuery() {
+ return conf.get(QUERY_PARAM_PROP);
+ }
+
+ /**
+ * Query used to retrieve parameters for the input query. The result set must
+ * match the input query parameters preceisely.
+ *
+ * @param segment_params_query
+ */
+ public void setParamsQuery(String segment_params_query) {
+ conf.set(QUERY_PARAM_PROP, segment_params_query);
+ }
+
+ /**
+ * Return static input parameters if set
+ *
+ * @return Collection of list of objects representing input parameters
+ * @throws IOException
+ */
+ public Collection<List<Object>> getInputParameters() throws IOException {
+ Collection<List<Object>> values = null;
+ String[] query_params = conf.getStrings(QUERY_PARAMS_PROP);
+ if (query_params != null) {
+ values = new ArrayList<List<Object>>();
+ for (String str_params : query_params) {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(StringUtils.hexStringToByte(str_params), str_params.length());
+ VerticaRecord record = new VerticaRecord();
+ record.readFields(in);
+ values.add(record.getValues());
+ }
+ }
+ return values;
+ }
+
+ /**
+ * Sets a collection of lists. Each list is passed to an input split and used
+ * as arguments to the input query.
+ *
+ * @param segmentParams
+ * @throws IOException
+ */
+ public void setInputParams(Collection<List<Object>> segment_params)
+ throws IOException {
+ String[] values = new String[segment_params.size()];
+ int i = 0;
+ for (List<Object> params : segment_params) {
+ DataOutputBuffer out = new DataOutputBuffer();
+ VerticaRecord record = new VerticaRecord(params, true);
+ record.write(out);
+ values[i++] = StringUtils.byteToHexString(out.getData());
+ }
+ conf.setStrings(QUERY_PARAMS_PROP, values);
+ }
+
+ /**
+ * For streaming return the delimiter to separate values to the mapper
+ *
+ * @return Returns delimiter used to format streaming input data
+ */
+ public String getInputDelimiter() {
+ return conf.get(INPUT_DELIMITER_PROP, DELIMITER);
+ }
+
+ /**
+ * For streaming set the delimiter to separate values to the mapper
+ */
+ public void setInputDelimiter(String delimiter) {
+ conf.set(INPUT_DELIMITER_PROP, delimiter);
+ }
+
+ /**
+ * For streaming return the record terminator to separate values to the mapper
+ *
+ * @return Returns recorder terminator for input data
+ */
+ public String getInputRecordTerminator() {
+ return conf.get(INPUT_TERMINATOR_PROP, RECORD_TERMINATER);
+ }
+
+ /**
+ * For streaming set the record terminator to separate values to the mapper
+ */
+ public void setInputRecordTerminator(String terminator) {
+ conf.set(INPUT_TERMINATOR_PROP, terminator);
+ }
+
+ /**
+ * Get the table that is the target of output
+ *
+ * @return Returns table name for output
+ */
+ public String getOutputTableName() {
+ return conf.get(OUTPUT_TABLE_NAME_PROP);
+ }
+
+ /**
+ * Set table that is being loaded as output
+ *
+ * @param tableName
+ */
+ public void setOutputTableName(String tableName) {
+ conf.set(OUTPUT_TABLE_NAME_PROP, tableName);
+ }
+
+ /**
+ * Return definition of columns for output table
+ *
+ * @return Returns table definition for output table
+ */
+ public String[] getOutputTableDef() {
+ return conf.getStrings(OUTPUT_TABLE_DEF_PROP);
+ }
+
+ /**
+ * Set the definition of a table for output if it needs to be created
+ *
+ * @param fieldNames
+ */
+ public void setOutputTableDef(String... fieldNames) {
+ conf.setStrings(OUTPUT_TABLE_DEF_PROP, fieldNames);
+ }
+
+ /**
+ * Return whether output table is truncated before loading
+ *
+ * @return Returns true if output table should be dropped before loading
+ */
+ public boolean getDropTable() {
+ return conf.getBoolean(OUTPUT_TABLE_DROP, false);
+ }
+
+ /**
+ * Set whether to truncate the output table before loading
+ *
+ * @param drop_table
+ */
+ public void setDropTable(boolean drop_table) {
+ conf.setBoolean(OUTPUT_TABLE_DROP, drop_table);
+ }
+
+ /**
+ * For streaming return the delimiter used by the reducer
+ *
+ * @return Returns delimiter to use for output data
+ */
+ public String getOutputDelimiter() {
+ return conf.get(OUTPUT_DELIMITER_PROP, DELIMITER);
+ }
+
+ /**
+ * For streaming set the delimiter used by the reducer
+ *
+ * @param delimiter
+ */
+ public void setOutputDelimiter(String delimiter) {
+ conf.set(OUTPUT_DELIMITER_PROP, delimiter);
+ }
+
+ /**
+ * For streaming return the record terminator used by the reducer
+ *
+ * @return Returns the record terminator for output data
+ */
+ public String getOutputRecordTerminator() {
+ return conf.get(OUTPUT_TERMINATOR_PROP, RECORD_TERMINATER);
+ }
+
+ /**
+ * For streaming set the record terminator used by the reducer
+ *
+ * @param terminator
+ */
+ public void setOutputRecordTerminator(String terminator) {
+ conf.set(OUTPUT_TERMINATOR_PROP, terminator);
+ }
+
+ /**
+ * Returns poll timer for optimize loop
+ *
+ * @return Returns poll timer for optimize loop
+ */
+ public Long getOptimizePollTimeout() {
+ return conf.getLong(OPTIMIZE_POLL_TIMER_PROP, OPTIMIZE_POLL_TIMER);
+ }
+
+ /**
+ * Set the timour for the optimize poll loop
+ *
+ * @param timeout
+ */
+ public void setOptimizePollTimeout(Long timeout) {
+ conf.setLong(OPTIMIZE_POLL_TIMER_PROP, timeout);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputFormat.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Input formatter that returns the results of a query executed against Vertica.
+ * The key is a record number within the result set of each mapper The value is
+ * a VerticaRecord, which uses a similar interface to JDBC ResultSets for
+ * returning values.
+ *
+ */
+public class VerticaInputFormat extends
+ InputFormat<LongWritable, VerticaRecord> {
+
+ /**
+ * Set the input query for a job
+ *
+ * @param job
+ * @param inputQuery
+ * query to run against Vertica
+ */
+ public static void setInput(Job job, String inputQuery) {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config = new VerticaConfiguration(job
+ .getConfiguration());
+ config.setInputQuery(inputQuery);
+ }
+
+ /**
+ * Set a parameterized input query for a job and the query that returns the
+ * parameters.
+ *
+ * @param job
+ * @param inputQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParamsQuery
+ * SQL query that returns parameters for the input query
+ */
+ public static void setInput(Job job, String inputQuery,
+ String segmentParamsQuery) {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config = new VerticaConfiguration(job
+ .getConfiguration());
+ config.setInputQuery(inputQuery);
+ config.setParamsQuery(segmentParamsQuery);
+ }
+
+ /**
+ * Set the input query and any number of comma delimited literal list of
+ * parameters
+ *
+ * @param job
+ * @param inputQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParams
+ * any numer of comma delimited strings with literal parameters to
+ * substitute in the input query
+ */
+ @SuppressWarnings("serial")
+ public static void setInput(Job job, String inputQuery,
+ String... segmentParams) throws IOException {
+ // transform each param set into array
+ DateFormat datefmt = DateFormat.getDateInstance();
+ Collection<List<Object>> params = new HashSet<List<Object>>() {
+ };
+ for (String strParams : segmentParams) {
+ List<Object> param = new ArrayList<Object>();
+
+ for (String strParam : strParams.split(",")) {
+ strParam = strParam.trim();
+ if (strParam.charAt(0) == '\''
+ && strParam.charAt(strParam.length() - 1) == '\'')
+ param.add(strParam.substring(1, strParam.length() - 1));
+ else {
+ try {
+ param.add(datefmt.parse(strParam));
+ } catch (ParseException e1) {
+ try {
+ param.add(Integer.parseInt(strParam));
+ } catch (NumberFormatException e2) {
+ throw new IOException("Error parsing argument " + strParam);
+ }
+ }
+ }
+ }
+
+ params.add(param);
+ }
+
+ setInput(job, inputQuery, params);
+ }
+
+ /**
+ * Set the input query and a collection of parameter lists
+ *
+ * @param job
+ * @param inpuQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParams
+ * collection of ordered lists to subtitute into the input query
+ * @throws IOException
+ */
+ public static void setInput(Job job, String inpuQuery,
+ Collection<List<Object>> segmentParams) throws IOException {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config = new VerticaConfiguration(job
+ .getConfiguration());
+ config.setInputQuery(inpuQuery);
+ config.setInputParams(segmentParams);
+ }
+
+ /** {@inheritDoc} */
+ public RecordReader<LongWritable, VerticaRecord> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ try {
+ return new VerticaRecordReader((VerticaInputSplit) split, context
+ .getConfiguration());
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ return VerticaUtil.getSplits(context);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaInputSplit.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Input split class for reading data from Vertica
+ *
+ */
+public class VerticaInputSplit extends InputSplit implements Writable {
+ private static final Log LOG = LogFactory.getLog(VerticaInputSplit.class);
+
+ PreparedStatement stmt = null;
+ Connection connection = null;
+ VerticaConfiguration vtconfig = null;
+ String inputQuery = null;
+ List<Object> segmentParams = null;
+ long start = 0;
+ long end = 0;
+
+ /** (@inheritDoc) */
+ public VerticaInputSplit() {
+ LOG.trace("Input split default constructor");
+ }
+
+ /**
+ * Set the input query and a list of parameters to substitute when evaluating
+ * the query
+ *
+ * @param inputQuery
+ * SQL query to run
+ * @param segmentParams
+ * list of parameters to substitute into the query
+ * @param start
+ * the logical starting record number
+ * @param end
+ * the logical ending record number
+ */
+ public VerticaInputSplit(String inputQuery, List<Object> segmentParams,
+ long start, long end) {
+ LOG.trace("Input split constructor with query and params");
+ this.inputQuery = inputQuery;
+ this.segmentParams = segmentParams;
+ this.start = start;
+ this.end = end;
+ }
+
+ /** (@inheritDoc) */
+ public void configure(Configuration conf) throws Exception {
+ LOG.trace("Input split configured");
+ vtconfig = new VerticaConfiguration(conf);
+ connection = vtconfig.getConnection(false);
+ connection.setAutoCommit(true);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+
+ /**
+ * Return the parameters used for input query
+ *
+ * @return
+ */
+ public List<Object> getSegmentParams() {
+ return segmentParams;
+ }
+
+ /**
+ * Run the query that, when executed returns input for the mapper
+ *
+ * @return
+ * @throws Exception
+ */
+ public ResultSet executeQuery() throws Exception {
+ LOG.trace("Input split execute query");
+ long length = getLength();
+
+ if (length != 0)
+ inputQuery = "SELECT * FROM ( " + inputQuery
+ + " ) limited LIMIT ? OFFSET ?";
+
+ if (connection == null)
+ throw new Exception("Cannot execute query with no connection");
+ stmt = connection.prepareStatement(inputQuery);
+
+ int i = 1;
+ if (segmentParams != null)
+ for (Object param : segmentParams)
+ stmt.setObject(i++, param);
+
+ if (length != 0) {
+ stmt.setLong(i++, length);
+ stmt.setLong(i++, start);
+ }
+
+ ResultSet rs = stmt.executeQuery();
+ return rs;
+ }
+
+ /** (@inheritDoc) */
+ public void close() throws SQLException {
+ stmt.close();
+ }
+
+ /**
+ * @return The index of the first row to select
+ */
+ public long getStart() {
+ return start;
+ }
+
+ /**
+ * @return The index of the last row to select
+ */
+ public long getEnd() {
+ return end;
+ }
+
+ /**
+ * @return The total row count in this split
+ */
+ public long getLength() throws IOException {
+ // TODO: figureout how to return length when there is no start and end
+ return end - start;
+ }
+
+ /** {@inheritDoc} */
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
+
+ /** (@inheritDoc) */
+ public Configuration getConfiguration() {
+ return vtconfig.getConfiguration();
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ inputQuery = Text.readString(in);
+ long paramCount = in.readLong();
+ if (paramCount > 0) {
+ VerticaRecord record = new VerticaRecord();
+ record.readFields(in);
+ segmentParams = record.getValues();
+ }
+ start = in.readLong();
+ end = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, inputQuery);
+ if (segmentParams != null && segmentParams.size() > 0) {
+ out.writeLong(segmentParams.size());
+ VerticaRecord record = new VerticaRecord(segmentParams, true);
+ record.write(out);
+ } else
+ out.writeLong(0);
+ out.writeLong(start);
+ out.writeLong(end);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaOutputFormat.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output formatter for loading reducer output to Vertica
+ *
+ */
+public class VerticaOutputFormat extends OutputFormat<Text, VerticaRecord> {
+ String delimiter = VerticaConfiguration.DELIMITER;
+ String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+ /**
+ * Set the output table
+ *
+ * @param conf
+ * @param tableName
+ */
+ public static void setOutput(Job job, String tableName) {
+ setOutput(job, tableName, false);
+ }
+
+ /**
+ * Set the output table and whether to drop it before loading
+ *
+ * @param job
+ * @param tableName
+ * @param dropTable
+ */
+ public static void setOutput(Job job, String tableName, boolean dropTable) {
+ setOutput(job, tableName, dropTable);
+ }
+
+ /**
+ * Set the output table, whether to drop it before loading and the create
+ * table specification if it doesn't exist
+ *
+ * @param job
+ * @param tableName
+ * @param dropTable
+ * @param tableDef
+ * list of column definitions such as "foo int", "bar varchar(10)"
+ */
+ public static void setOutput(Job job, String tableName, boolean dropTable,
+ String... tableDef) {
+ VerticaConfiguration vtconfig = new VerticaConfiguration(job
+ .getConfiguration());
+ vtconfig.setOutputTableName(tableName);
+ vtconfig.setOutputTableDef(tableDef);
+ vtconfig.setDropTable(dropTable);
+ }
+
+ // TODO: handle collection of output tables private class VerticaTable {
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context) throws IOException {
+ VerticaUtil.checkOutputSpecs(context.getConfiguration());
+ VerticaConfiguration vtconfig = new VerticaConfiguration(context
+ .getConfiguration());
+ delimiter = vtconfig.getOutputDelimiter();
+ terminator = vtconfig.getOutputRecordTerminator();
+ }
+
+ /**
+ * Test check specs (don't connect to db)
+ *
+ * @param context
+ * @param test
+ * true if testing
+ * @throws IOException
+ */
+ public void checkOutputSpecs(JobContext context, boolean test)
+ throws IOException {
+ VerticaUtil.checkOutputSpecs(context.getConfiguration());
+ VerticaConfiguration vtconfig = new VerticaConfiguration(context
+ .getConfiguration());
+ delimiter = vtconfig.getOutputDelimiter();
+ terminator = vtconfig.getOutputRecordTerminator();
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter<Text, VerticaRecord> getRecordWriter(
+ TaskAttemptContext context) throws IOException {
+
+ VerticaConfiguration config = new VerticaConfiguration(context
+ .getConfiguration());
+
+ String name = context.getJobName();
+ // TODO: use explicit date formats
+ String table = config.getOutputTableName();
+ String copyStmt = "COPY " + table + " FROM STDIN" + " DELIMITER '"
+ + delimiter + "' RECORD TERMINATOR '" + terminator + "' STREAM NAME '"
+ + name + "' DIRECT";
+
+ try {
+ Connection conn = config.getConnection(true);
+ return new VerticaRecordWriter(conn, copyStmt, table, delimiter,
+ terminator);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public static VerticaRecord getValue(Configuration conf) throws Exception {
+ VerticaConfiguration config = new VerticaConfiguration(conf);
+ String table = config.getOutputTableName();
+ Connection conn = config.getConnection(true);
+ return (new VerticaRecordWriter(conn, "", table, config
+ .getOutputDelimiter(), config.getOutputRecordTerminator())).getValue();
+ }
+
+ /**
+ * Optionally called at the end of a job to optimize any newly created and
+ * loaded tables. Useful for new tables with more than 100k records.
+ *
+ * @param conf
+ * @throws Exception
+ */
+ public static void optimize(Configuration conf) throws Exception {
+ VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+ Connection conn = vtconfig.getConnection(true);
+
+ // TODO: consider more tables and skip tables with non-temp projections
+ String tableName = vtconfig.getOutputTableName();
+ Statement stmt = conn.createStatement();
+ ResultSet rs = null;
+ StringBuffer designTables = new StringBuffer(tableName);
+ HashSet<String> tablesWithTemp = new HashSet<String>();
+
+ //fully qualify the table name - defaults to public.<table>
+ if(tableName.indexOf(".") == -1) {
+ tableName = "public." + tableName;
+ }
+
+ //for now just add the single output table
+ tablesWithTemp.add(tableName);
+
+ // map from table name to set of projection names
+ HashMap<String, Collection<String>> tableProj = new HashMap<String, Collection<String>>();
+ rs = stmt.executeQuery("select schemaname, anchortablename, projname from vt_projection;");
+ while(rs.next()) {
+ String ptable = rs.getString(1) + "." + rs.getString(2);
+ if(!tableProj.containsKey(ptable)) {
+ tableProj.put(ptable, new HashSet<String>());
+ }
+
+ tableProj.get(ptable).add(rs.getString(3));
+ }
+
+ for(String table : tablesWithTemp) {
+ if(!tableProj.containsKey(table)) {
+ throw new RuntimeException("Cannot optimize table with no data: " + table);
+ }
+ }
+
+ String designName = (new Integer(conn.hashCode())).toString();
+ stmt.execute("select create_projection_design('" + designName + "', '', '"
+ + designTables.toString() + "')");
+
+ rs = stmt.executeQuery("select get_design_script('" + designName + "', '"
+ + designName + "')");
+ rs.next();
+ String[] projSet = rs.getString(1).split(";");
+ for (String proj : projSet) {
+ stmt.execute(proj);
+ }
+ stmt.execute("select start_refresh()");
+
+ // pool for refresh complete
+ boolean refreshing = true;
+ Long timeout = vtconfig.getOptimizePollTimeout();
+ while (refreshing) {
+ refreshing = false;
+ rs = stmt
+ .executeQuery("select table_name, projection_name, status from vt_projection_refresh");
+ while (rs.next()) {
+ String table = rs.getString(1);
+ String stat = rs.getString(3);
+ if (stat.equals("refreshing") && tablesWithTemp.contains(table))
+ refreshing = true;
+ }
+
+ Thread.sleep(timeout);
+ }
+
+ // refresh done, move the ahm and drop the temp projections
+ stmt.execute("select make_ahm_now()");
+
+ for (String table : tablesWithTemp) {
+ for (String proj : tableProj.get(table)) {
+ stmt.execute("DROP PROJECTION " + proj);
+ }
+ }
+ }
+
+ /** (@inheritDoc) */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+ context);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecord.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Serializable record for records returned from and written to Vertica
+ *
+ */
+public class VerticaRecord implements Writable {
+ ResultSet results = null;
+ ResultSetMetaData meta = null;
+ int columns = 0;
+ List<Integer> types = null;
+ List<Object> values = null;
+ List<String> names = null;
+ boolean dateString;
+ String delimiter = VerticaConfiguration.DELIMITER;
+ String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+ DateFormat datefmt = new SimpleDateFormat("yyyyMMdd");
+ DateFormat timefmt = new SimpleDateFormat("HHmmss");
+ DateFormat tmstmpfmt = new SimpleDateFormat("yyyyMMddHHmmss");
+ DateFormat sqlfmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public List<Integer> getTypes() {
+ return types;
+ }
+
+ /**
+ * Create a new VerticaRecord class out of a query result set
+ *
+ * @param results
+ * ResultSet returned from running input split query
+ * @param dateString
+ * True if dates should be marshaled as strings
+ * @throws SQLException
+ */
+ VerticaRecord(ResultSet results, boolean dateString) throws SQLException {
+ this.results = results;
+ this.dateString = dateString;
+ meta = results.getMetaData();
+ columns = meta.getColumnCount();
+ names = new ArrayList<String>(columns);
+ types = new ArrayList<Integer>(columns);
+ values = new ArrayList<Object>(columns);
+ for (int i = 0; i < columns; i++) {
+ names.add(meta.getCatalogName(i + 1));
+ types.add(meta.getColumnType(i + 1));
+ values.add(null);
+ }
+ }
+
+ public VerticaRecord() {
+ this.types = new ArrayList<Integer>();
+ this.values = new ArrayList<Object>();
+ }
+
+ public VerticaRecord(List<String> names, List<Integer> types) {
+ this.names = names;
+ this.types = types;
+ values = new ArrayList<Object>();
+ for (@SuppressWarnings("unused")
+ Integer type : types)
+ values.add(null);
+ columns = values.size();
+ }
+
+ public VerticaRecord(List<Object> values, boolean parseTypes) {
+ this.types = new ArrayList<Integer>();
+ this.values = values;
+ columns = values.size();
+ objectTypes();
+ }
+
+ /**
+ * Test interface for junit tests that do not require a database
+ *
+ * @param types
+ * @param values
+ * @param dateString
+ */
+ public VerticaRecord(List<String> names, List<Integer> types,
+ List<Object> values, boolean dateString) {
+ this.names = names;
+ this.types = types;
+ this.values = values;
+ this.dateString = dateString;
+ columns = types.size();
+ if (types.size() == 0)
+ objectTypes();
+ }
+
+ public Object get(String name) throws Exception {
+ if (names == null || names.size() == 0)
+ throw new Exception("Cannot set record by name if names not initialized");
+ int i = names.indexOf(name);
+ return get(i);
+ }
+
+ public Object get(int i) {
+ if (i >= values.size())
+ throw new IndexOutOfBoundsException("Index " + i
+ + " greater than input size " + values.size());
+ return values.get(i);
+ }
+
+ public void set(String name, Object value) throws Exception {
+ if (names == null || names.size() == 0)
+ throw new Exception("Cannot set record by name if names not initialized");
+ int i = names.indexOf(name);
+ set(i, value);
+ }
+
+ /**
+ * set a value, 0 indexed
+ *
+ * @param i
+ */
+ public void set(Integer i, Object value) {
+ set(i, value, false);
+ }
+
+ /**
+ * set a value, 0 indexed
+ *
+ * @param i
+ */
+ public void set(Integer i, Object value, boolean validate) {
+ if (i >= values.size())
+ throw new IndexOutOfBoundsException("Index " + i
+ + " greater than input size " + values.size());
+ if (validate) {
+ Integer type = types.get(i);
+ switch (type) {
+ case Types.BIGINT:
+ if (!(value instanceof Long) && !(value instanceof Integer)
+ && !(value instanceof Short) && !(value instanceof LongWritable)
+ && !(value instanceof VLongWritable)
+ && !(value instanceof VIntWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Long");
+ break;
+ case Types.INTEGER:
+ if (!(value instanceof Integer) && !(value instanceof Short)
+ && !(value instanceof VIntWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Integer");
+ break;
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ if (!(value instanceof Short))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Short");
+ break;
+ case Types.REAL:
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ case Types.DOUBLE:
+ if (!(value instanceof Double) && !(value instanceof Float)
+ && !(value instanceof DoubleWritable)
+ && !(value instanceof FloatWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Double");
+ break;
+ case Types.FLOAT:
+ if (!(value instanceof Float) && !(value instanceof FloatWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Float");
+ break;
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ case Types.VARBINARY:
+ if (!(value instanceof byte[]) && !(value instanceof BytesWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to byte[]");
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ if (!(value instanceof Boolean) && !(value instanceof BooleanWritable)
+ && !(value instanceof ByteWritable))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Boolean");
+ break;
+ case Types.CHAR:
+ if (!(value instanceof Character) && !(value instanceof String))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Character");
+ break;
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.VARCHAR:
+ if (!(value instanceof String) && !(value instanceof Text))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to String");
+ break;
+ case Types.DATE:
+ if (!(value instanceof Date) && !(value instanceof java.util.Date))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Date");
+ break;
+ case Types.TIME:
+ if (!(value instanceof Time) && !(value instanceof java.util.Date))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Time");
+ break;
+ case Types.TIMESTAMP:
+ if (!(value instanceof Timestamp) && !(value instanceof java.util.Date))
+ throw new ClassCastException("Cannot cast "
+ + value.getClass().getName() + " to Timestamp");
+ break;
+ default:
+ throw new RuntimeException("Unknown type value " + types.get(i));
+ }
+ }
+ values.set(i, value);
+ }
+
+ public boolean next() throws SQLException {
+ if (results.next()) {
+ for (int i = 1; i <= columns; i++)
+ values.set(i - 1, results.getObject(i));
+ return true;
+ }
+ return false;
+ }
+
+ private void objectTypes() {
+ for (Object obj : values) {
+ if (obj == null) {
+ this.types.add(null);
+ } else if (obj instanceof Long) {
+ this.types.add(Types.BIGINT);
+ } else if (obj instanceof LongWritable) {
+ this.types.add(Types.BIGINT);
+ } else if (obj instanceof VLongWritable) {
+ this.types.add(Types.BIGINT);
+ } else if (obj instanceof VIntWritable) {
+ this.types.add(Types.INTEGER);
+ } else if (obj instanceof Integer) {
+ this.types.add(Types.INTEGER);
+ } else if (obj instanceof Short) {
+ this.types.add(Types.SMALLINT);
+ } else if (obj instanceof DoubleWritable) {
+ this.types.add(Types.DOUBLE);
+ } else if (obj instanceof Double) {
+ this.types.add(Types.DOUBLE);
+ } else if (obj instanceof Float) {
+ this.types.add(Types.FLOAT);
+ } else if (obj instanceof FloatWritable) {
+ this.types.add(Types.FLOAT);
+ } else if (obj instanceof byte[]) {
+ this.types.add(Types.BINARY);
+ } else if (obj instanceof ByteWritable) {
+ this.types.add(Types.BINARY);
+ } else if (obj instanceof Boolean) {
+ this.types.add(Types.BOOLEAN);
+ } else if (obj instanceof BooleanWritable) {
+ this.types.add(Types.BOOLEAN);
+ } else if (obj instanceof Character) {
+ this.types.add(Types.CHAR);
+ } else if (obj instanceof String) {
+ this.types.add(Types.VARCHAR);
+ } else if (obj instanceof BytesWritable) {
+ this.types.add(Types.VARCHAR);
+ } else if (obj instanceof Text) {
+ this.types.add(Types.VARCHAR);
+ } else if (obj instanceof java.util.Date) {
+ this.types.add(Types.DATE);
+ } else if (obj instanceof Date) {
+ this.types.add(Types.DATE);
+ } else if (obj instanceof Time) {
+ this.types.add(Types.TIME);
+ } else if (obj instanceof Timestamp) {
+ this.types.add(Types.TIMESTAMP);
+ } else {
+ throw new RuntimeException("Unknown type " + obj.getClass().getName()
+ + " passed to Vertica Record");
+ }
+ }
+ }
+
+ public String toSQLString() {
+ return toSQLString(delimiter, terminator);
+ }
+
+ public String toSQLString(String delimiterArg, String terminatorArg) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < columns; i++) {
+ Object obj = values.get(i);
+ Integer type = types.get(i);
+
+ // switch statement uses fall through to handle type variations
+ // e.g. type specified as BIGINT but passed in as Integer
+ switch (type) {
+ case Types.BIGINT:
+ if (obj instanceof Long) {
+ sb.append(((Long) obj).toString());
+ break;
+ }
+ case Types.INTEGER:
+ if (obj instanceof Integer) {
+ sb.append(((Integer) obj).toString());
+ break;
+ }
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ if (obj instanceof Short) {
+ sb.append(((Short) obj).toString());
+ break;
+ }
+ if (obj instanceof LongWritable) {
+ sb.append(((LongWritable) obj).get());
+ break;
+ }
+ if (obj instanceof VLongWritable) {
+ sb.append(((VLongWritable) obj).get());
+ break;
+ }
+ if (obj instanceof VIntWritable) {
+ sb.append(((VIntWritable) obj).get());
+ break;
+ }
+ case Types.REAL:
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ case Types.DOUBLE:
+ if (obj instanceof Double) {
+ sb.append(((Double) obj).toString());
+ break;
+ }
+ if (obj instanceof DoubleWritable) {
+ sb.append(((DoubleWritable) obj).toString());
+ break;
+ }
+ case Types.FLOAT:
+ if (obj instanceof Float) {
+ sb.append(((Float) obj).toString());
+ break;
+ }
+ if (obj instanceof FloatWritable) {
+ sb.append(((FloatWritable) obj).get());
+ break;
+ }
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ case Types.VARBINARY:
+ sb.append(ByteBuffer.wrap((byte[]) obj).asCharBuffer());
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ if (obj instanceof Boolean) {
+ if ((Boolean) obj)
+ sb.append("true");
+ else
+ sb.append("false");
+ break;
+ }
+ if (obj instanceof BooleanWritable) {
+ if (((BooleanWritable) obj).get())
+ sb.append("true");
+ else
+ sb.append("false");
+ break;
+ }
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.VARCHAR:
+ if (obj instanceof String) {
+ sb.append((String) obj);
+ break;
+ }
+ if (obj instanceof byte[]) {
+ sb.append((byte[]) obj);
+ break;
+ }
+ if (obj instanceof BytesWritable) {
+ sb.append(((BytesWritable) obj).getBytes());
+ break;
+ }
+ case Types.CHAR:
+ if (obj instanceof Character) {
+ sb.append((Character) obj);
+ break;
+ }
+ if (obj instanceof ByteWritable) {
+ sb.append(((ByteWritable) obj).get());
+ break;
+ }
+ case Types.DATE:
+ case Types.TIME:
+ case Types.TIMESTAMP:
+ if (obj instanceof java.util.Date)
+ sb.append(sqlfmt.format((java.util.Date) obj));
+ else if (obj instanceof Date)
+ sb.append(sqlfmt.format((Date) obj));
+ else if (obj instanceof Time)
+ sb.append(sqlfmt.format((Time) obj));
+ else if (obj instanceof Timestamp)
+ sb.append(sqlfmt.format((Timestamp) obj));
+ break;
+ default:
+ throw new RuntimeException("Unknown type value " + types.get(i));
+ }
+ if (i < columns - 1)
+ sb.append(delimiterArg);
+ else
+ sb.append(terminatorArg);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ columns = in.readInt();
+ if (types.size() > 0)
+ types.clear();
+ for (int i = 0; i < columns; i++)
+ types.add(in.readInt());
+
+ for (int i = 0; i < columns; i++) {
+ int type = types.get(i);
+ switch (type) {
+ case Types.BIGINT:
+ values.add(in.readLong());
+ break;
+ case Types.INTEGER:
+ values.add(in.readInt());
+ break;
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ values.add(in.readShort());
+ break;
+ case Types.REAL:
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ case Types.DOUBLE:
+ values.add(in.readDouble());
+ break;
+ case Types.FLOAT:
+ values.add(in.readFloat());
+ break;
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ case Types.VARBINARY:
+ values.add(StringUtils.hexStringToByte(Text.readString(in)));
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ values.add(in.readBoolean());
+ break;
+ case Types.CHAR:
+ values.add(in.readChar());
+ break;
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.VARCHAR:
+ values.add(Text.readString(in));
+ break;
+ case Types.DATE:
+ if (dateString)
+ try {
+ values.add(new Date(datefmt.parse(Text.readString(in)).getTime()));
+ } catch (ParseException e) {
+ throw new IOException(e);
+ }
+ else
+ values.add(new Date(in.readLong()));
+ break;
+ case Types.TIME:
+ if (dateString)
+ try {
+ values.add(new Time(timefmt.parse(Text.readString(in)).getTime()));
+ } catch (ParseException e) {
+ throw new IOException(e);
+ }
+ else
+ values.add(new Time(in.readLong()));
+ break;
+ case Types.TIMESTAMP:
+ if (dateString)
+ try {
+ values.add(new Timestamp(tmstmpfmt.parse(Text.readString(in))
+ .getTime()));
+ } catch (ParseException e) {
+ throw new IOException(e);
+ }
+ else
+ values.add(new Timestamp(in.readLong()));
+ break;
+ default:
+ throw new IOException("Unknown type value " + type);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(columns);
+ for (Integer type : types)
+ out.writeInt(type);
+
+ for (int i = 0; i < columns; i++) {
+ Object obj = values.get(i);
+ Integer type = types.get(i);
+
+ switch (type) {
+ case Types.BIGINT:
+ out.writeLong((Long) obj);
+ break;
+ case Types.INTEGER:
+ out.writeInt((Integer) obj);
+ break;
+ case Types.TINYINT:
+ case Types.SMALLINT:
+ out.writeShort((Short) obj);
+ break;
+ case Types.REAL:
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ case Types.DOUBLE:
+ out.writeDouble((Double) obj);
+ break;
+ case Types.FLOAT:
+ out.writeFloat((Float) obj);
+ break;
+ case Types.BINARY:
+ case Types.LONGVARBINARY:
+ case Types.VARBINARY:
+ Text.writeString(out, StringUtils.byteToHexString((byte[]) obj));
+ break;
+ case Types.BIT:
+ case Types.BOOLEAN:
+ out.writeBoolean((Boolean) obj);
+ break;
+ case Types.CHAR:
+ out.writeChar((Character) obj);
+ break;
+ case Types.LONGNVARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.VARCHAR:
+ Text.writeString(out, (String) obj);
+ break;
+ case Types.DATE:
+ if (obj instanceof java.util.Date) {
+ if (dateString)
+ Text.writeString(out, datefmt.format((java.util.Date) obj));
+ else
+ out.writeLong(((java.util.Date) obj).getTime());
+ } else {
+ if (dateString)
+ Text.writeString(out, datefmt.format((Date) obj));
+ else
+ out.writeLong(((Date) obj).getTime());
+ }
+ break;
+ case Types.TIME:
+ if (dateString)
+ Text.writeString(out, timefmt.format((Time) obj));
+ else
+ out.writeLong(((Time) obj).getTime());
+ break;
+ case Types.TIMESTAMP:
+ if (dateString)
+ Text.writeString(out, tmstmpfmt.format((Timestamp) obj));
+ else
+ out.writeLong(((Timestamp) obj).getTime());
+ break;
+ default:
+ throw new IOException("Unknown type value " + types.get(i));
+ }
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordReader.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaRecordReader extends
+ RecordReader<LongWritable, VerticaRecord> {
+ ResultSet results = null;
+ long start = 0;
+ int pos = 0;
+ long length = 0;
+ VerticaInputSplit split = null;
+ LongWritable key = null;
+ VerticaRecord value = null;
+
+ public VerticaRecordReader(VerticaInputSplit split, Configuration job)
+ throws Exception {
+ // run query for this segment
+ this.split = split;
+ split.configure(job);
+ start = split.getStart();
+ length = split.getLength();
+ results = split.executeQuery();
+ }
+
+ /** {@inheritDoc} */
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ key = new LongWritable();
+ try {
+ pos++;
+ value = new VerticaRecord(results, false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void close() throws IOException {
+ try {
+ split.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ /** {@inheritDoc} */
+ public float getProgress() throws IOException {
+ // TODO: figure out why length would be 0
+ if (length == 0)
+ return 1;
+ return pos / length;
+ }
+
+ /** {@inheritDoc} */
+ public boolean next(LongWritable key, VerticaRecord value) throws IOException {
+ key.set(pos + start);
+ pos++;
+ try {
+ return value.next();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public VerticaRecord getCurrentValue() throws IOException,
+ InterruptedException {
+ return value;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ key.set(pos + start);
+ pos++;
+ try {
+ return value.next();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaRecordWriter.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaRecordWriter extends RecordWriter<Text, VerticaRecord> {
+ String writerTable = null;
+ Connection connection = null;
+ Statement statement = null; // com.vertica.PGStatement
+ String copyStmt = null;
+ String delimiter = VerticaConfiguration.DELIMITER;
+ String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+ // Methods from com.vertica.PGStatement
+ Method startCopyIn = null;
+ Method finishCopyIn = null;
+ Method addStreamToCopyIn = null;
+
+ public VerticaRecordWriter(Connection connection, String copyStmt,
+ String writerTable, String delimiter, String terminator) {
+ this.connection = connection;
+ this.copyStmt = copyStmt;
+ this.writerTable = writerTable;
+ this.delimiter = delimiter;
+ this.terminator = terminator;
+
+ try {
+ startCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+ "startCopyIn", String.class, InputStream.class);
+ finishCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+ "finishCopyIn");
+ addStreamToCopyIn = Class.forName("com.vertica.PGStatement").getMethod(
+ "addStreamToCopyIn", InputStream.class);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Vertica Formatter requies a the Vertica jdbc driver");
+ }
+ }
+
+ public VerticaRecord getValue() throws SQLException {
+ DatabaseMetaData dbmd = connection.getMetaData();
+
+ String schema = null;
+ String table = null;
+ String[] schemaTable = writerTable.split("\\.");
+ if (schemaTable.length == 2) {
+ schema = schemaTable[0];
+ table = schemaTable[1];
+ } else if (schemaTable.length == 1) {
+ table = schemaTable[0];
+ } else {
+ throw new RuntimeException(
+ "Vertica Formatter requires a value output table");
+ }
+
+ List<Integer> types = new ArrayList<Integer>();
+ List<String> names = new ArrayList<String>();
+ ResultSet rs = dbmd.getColumns(null, schema, table, null);
+ while (rs.next()) {
+ names.add(rs.getString(4));
+ types.add(rs.getInt(5));
+ }
+
+ VerticaRecord record = new VerticaRecord(names, types);
+ return record;
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ try {
+ if (statement != null)
+ finishCopyIn.invoke(statement); // statement.finishCopyIn();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void write(Text table, VerticaRecord record) throws IOException {
+ if (!table.toString().equals(writerTable))
+ throw new IOException("Writing to different table " + table.toString()
+ + ". Expecting " + writerTable);
+
+ String strRecord = record.toSQLString(delimiter, terminator);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(strRecord.getBytes());
+ try {
+ if (statement == null) {
+ statement = connection.createStatement();
+ startCopyIn.invoke(statement, copyStmt, bais); // statement.startCopyIn(copyStmt,
+ // bais);
+ } else
+ addStreamToCopyIn.invoke(statement, bais); // statement.addStreamToCopyIn(bais);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingInput.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class VerticaStreamingInput extends InputFormat<Text, Text> {
+
+ public RecordReader<Text, Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ try {
+ return new VerticaStreamingRecordReader((VerticaInputSplit) split,
+ context.getConfiguration());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ return VerticaUtil.getSplits(context);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java?rev=816735&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/vertica/src/java/org/apache/hadoop/vertica/VerticaStreamingOutput.java Fri Sep 18 18:24:31 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vertica;
+
+import java.io.IOException;
+import java.sql.Connection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class VerticaStreamingOutput extends OutputFormat<Text, Text> {
+ private static final Log LOG = LogFactory
+ .getLog(VerticaStreamingOutput.class);
+
+ String delimiter = VerticaConfiguration.DELIMITER;
+ String terminator = VerticaConfiguration.RECORD_TERMINATER;
+
+ public void checkOutputSpecs(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ VerticaUtil.checkOutputSpecs(conf);
+ VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+ delimiter = vtconfig.getOutputDelimiter();
+ terminator = vtconfig.getOutputRecordTerminator();
+ LOG.debug("Vertica output using delimiter '" + delimiter
+ + "' and terminator '" + terminator + "'");
+ }
+
+ public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ Configuration conf = context.getConfiguration();
+ VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+
+ String name = context.getJobName();
+ delimiter = vtconfig.getOutputDelimiter();
+ terminator = vtconfig.getOutputRecordTerminator();
+
+ // TODO: use explicit date formats
+ String table = vtconfig.getOutputTableName();
+ String copyStmt = "COPY " + table + " FROM STDIN" + " DELIMITER '"
+ + delimiter + "' RECORD TERMINATOR '" + terminator + "' STREAM NAME '"
+ + name + "' DIRECT";
+
+ try {
+ Connection conn = vtconfig.getConnection(true);
+ return new VerticaStreamingRecordWriter(conn, copyStmt, table);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+ context);
+ }
+}
|