Author: tomwhite Date: Tue Sep 8 12:52:55 2009 New Revision: 812486 URL: http://svn.apache.org/viewvc?rev=812486&view=rev Log: MAPREDUCE-938. Postgresql support for Sqoop. Contributed by Aaron Kimball. Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep 8 12:52:55 2009 @@ -91,6 +91,8 @@ logs and generating job traces for simulation and analysis. (Dick King via cdouglas) + MAPREDUCE-938. Postgresql support for Sqoop. (Aaron Kimball via tomwhite) + IMPROVEMENTS MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop. Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Tue Sep 8 12:52:55 2009 @@ -81,6 +81,12 @@ } else { return new MySQLManager(options); } + } else if (scheme.equals("jdbc:postgresql:")) { + if (options.isDirect()) { + return new DirectPostgresqlManager(options); + } else { + return new PostgresqlManager(options); + } } else if (scheme.startsWith("jdbc:hsqldb:")) { return new HsqldbManager(options); } else if (scheme.startsWith("jdbc:oracle:")) { Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=812486&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (added) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Tue Sep 8 12:52:55 2009 @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.manager; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.util.DirectImportUtils; +import org.apache.hadoop.sqoop.util.Executor; +import org.apache.hadoop.sqoop.util.ImportError; +import org.apache.hadoop.sqoop.util.JdbcUrl; +import org.apache.hadoop.sqoop.util.PerfCounters; +import org.apache.hadoop.sqoop.util.StreamHandlerFactory; + +/** + * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT + * commands. + */ +public class DirectPostgresqlManager extends PostgresqlManager { + public static final Log LOG = LogFactory.getLog(DirectPostgresqlManager.class.getName()); + + public DirectPostgresqlManager(final ImportOptions opts) { + // Inform superclass that we're overriding import method via alt. constructor. + super(opts, true); + } + + private static final String PSQL_CMD = "psql"; + + /** Copies data directly into HDFS, adding the user's chosen line terminator + char to each record. + */ + static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory { + private final BufferedWriter writer; + private final PerfCounters counters; + private final ImportOptions options; + + PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts, + final PerfCounters ctrs) { + this.writer = w; + this.options = opts; + this.counters = ctrs; + } + + private PostgresqlStreamThread child; + + public void processStream(InputStream is) { + child = new PostgresqlStreamThread(is, writer, options, counters); + child.start(); + } + + public int join() throws InterruptedException { + child.join(); + if (child.isErrored()) { + return 1; + } else { + return 0; + } + } + + private static class PostgresqlStreamThread extends Thread { + public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName()); + + private final BufferedWriter writer; + private final InputStream stream; + private final ImportOptions options; + private final PerfCounters counters; + + private boolean error; + + PostgresqlStreamThread(final InputStream is, final BufferedWriter w, + final ImportOptions opts, final PerfCounters ctrs) { + this.stream = is; + this.writer = w; + this.options = opts; + this.counters = ctrs; + } + + public boolean isErrored() { + return error; + } + + public void run() { + BufferedReader r = null; + BufferedWriter w = this.writer; + + char recordDelim = this.options.getOutputRecordDelim(); + + try { + r = new BufferedReader(new InputStreamReader(this.stream)); + + // read/write transfer loop here. + while (true) { + String inLine = r.readLine(); + if (null == inLine) { + break; // EOF + } + + w.write(inLine); + w.write(recordDelim); + counters.addBytes(1 + inLine.length()); + } + } catch (IOException ioe) { + LOG.error("IOException reading from psql: " + ioe.toString()); + // set the error bit so our caller can see that something went wrong. + error = true; + } finally { + if (null != r) { + try { + r.close(); + } catch (IOException ioe) { + LOG.info("Error closing FIFO stream: " + ioe.toString()); + } + } + + if (null != w) { + try { + w.close(); + } catch (IOException ioe) { + LOG.info("Error closing HDFS stream: " + ioe.toString()); + } + } + } + } + } + } + + /** + Takes a list of columns and turns them into a string like "col1, col2, col3..." + */ + private String getColumnListStr(String [] cols) { + if (null == cols) { + return null; + } + + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String col : cols) { + if (!first) { + sb.append(", "); + } + sb.append(col); + first = false; + } + + return sb.toString(); + } + + /** + * @return the Postgresql-specific SQL command to copy the table ("COPY .... TO STDOUT") + */ + private String getCopyCommand(String tableName) { + /* + Format of this command is: + + COPY table(col, col....) TO STDOUT + or COPY ( query ) TO STDOUT + WITH DELIMITER 'fieldsep' + CSV + QUOTE 'quotechar' + ESCAPE 'escapechar' + FORCE QUOTE col, col, col.... + */ + + StringBuilder sb = new StringBuilder(); + String [] cols = getColumnNames(tableName); + + sb.append("COPY "); + String whereClause = this.options.getWhereClause(); + if (whereClause != null && whereClause.length() > 0) { + // Import from a SELECT QUERY + sb.append("("); + sb.append("SELECT "); + if (null != cols) { + sb.append(getColumnListStr(cols)); + } else { + sb.append("*"); + } + + sb.append(" FROM "); + sb.append(tableName); + sb.append(" WHERE "); + sb.append(whereClause); + sb.append(")"); + } else { + // Import just the table. + sb.append(tableName); + if (null != cols) { + // specify columns. + sb.append("("); + sb.append(getColumnListStr(cols)); + sb.append(")"); + } + } + + // Translate delimiter characters to '\ooo' octal representation. + sb.append(" TO STDOUT WITH DELIMITER E'\\"); + sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8)); + sb.append("' CSV "); + if (this.options.getOutputEnclosedBy() != '\0') { + sb.append("QUOTE E'\\"); + sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8)); + sb.append("' "); + } + if (this.options.getOutputEscapedBy() != '\0') { + sb.append("ESCAPE E'\\"); + sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8)); + sb.append("' "); + } + + // add the "FORCE QUOTE col, col, col..." clause if quotes are required. + if (null != cols && this.options.isOutputEncloseRequired()) { + sb.append("FORCE QUOTE "); + sb.append(getColumnListStr(cols)); + } + + sb.append(";"); + + String copyCmd = sb.toString(); + LOG.debug("Copy command is " + copyCmd); + return copyCmd; + } + + /** Write the COPY command to a temp file + * @return the filename we wrote to. + */ + private String writeCopyCommand(String command) throws IOException { + String tmpDir = options.getTempDir(); + File tempFile = File.createTempFile("tmp-",".sql", new File(tmpDir)); + BufferedWriter w = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(tempFile))); + w.write(command); + w.newLine(); + w.close(); + return tempFile.toString(); + } + + /** Write the user's password to a file that is chmod 0600. + @return the filename. + */ + private String writePasswordFile(String password) throws IOException { + + String tmpDir = options.getTempDir(); + File tempFile = File.createTempFile("pgpass",".pgpass", new File(tmpDir)); + LOG.debug("Writing password to tempfile: " + tempFile); + + // Make sure it's only readable by the current user. + DirectImportUtils.setFilePermissions(tempFile, "0600"); + + // Actually write the password data into the file. + BufferedWriter w = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(tempFile))); + w.write("*:*:*:*:" + password); + w.close(); + return tempFile.toString(); + } + + /** @return true if someHost refers to localhost. + */ + private boolean isLocalhost(String someHost) { + if (null == someHost) { + return false; + } + + try { + InetAddress localHostAddr = InetAddress.getLocalHost(); + InetAddress someAddr = InetAddress.getByName(someHost); + + return localHostAddr.equals(someAddr); + } catch (UnknownHostException uhe) { + return false; + } + } + + @Override + /** + * Import the table into HDFS by using psql to pull the data out of the db + * via COPY FILE TO STDOUT. + */ + public void importTable(String tableName, String jarFile, Configuration conf) + throws IOException, ImportError { + + LOG.info("Beginning psql fast path import"); + + if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) { + // TODO(aaron): Support SequenceFile-based load-in + LOG.warn("File import layout" + options.getFileLayout() + + " is not supported by"); + LOG.warn("Postgresql direct import; import will proceed as text files."); + } + + String commandFilename = null; + String passwordFilename = null; + Process p = null; + StreamHandlerFactory streamHandler = null; + PerfCounters counters = new PerfCounters(); + + try { + // Get the COPY TABLE command to issue, write this to a file, and pass it + // in to psql with -f filename. + // Then make sure we delete this file in our finally block. + String copyCmd = getCopyCommand(tableName); + commandFilename = writeCopyCommand(copyCmd); + + // Arguments to pass to psql on the command line. + ArrayList args = new ArrayList(); + + // Environment to pass to psql. + List envp = Executor.getCurEnvpStrings(); + + // We need to parse the connect string URI to determine the database name + // and the host and port. If the host is localhost and the port is not specified, + // we don't want to pass this to psql, because we want to force the use of a + // UNIX domain socket, not a TCP/IP socket. + String connectString = options.getConnectString(); + String databaseName = JdbcUrl.getDatabaseName(connectString); + String hostname = JdbcUrl.getHostName(connectString); + int port = JdbcUrl.getPort(connectString); + + if (null == databaseName) { + throw new ImportError("Could not determine database name"); + } + + LOG.info("Performing import of table " + tableName + " from database " + databaseName); + args.add(PSQL_CMD); // requires that this is on the path. + args.add("--tuples-only"); + args.add("--quiet"); + + String username = options.getUsername(); + if (username != null) { + args.add("--username"); + args.add(username); + String password = options.getPassword(); + if (null != password) { + passwordFilename = writePasswordFile(password); + // Need to send PGPASSFILE environment variable specifying + // location of our postgres file. + envp.add("PGPASSFILE=" + passwordFilename); + } + } + + if (!isLocalhost(hostname) || port != -1) { + args.add("--host"); + args.add(hostname); + args.add("--port"); + args.add(Integer.toString(port)); + } + + if (null != databaseName && databaseName.length() > 0) { + args.add(databaseName); + } + + // The COPY command is in a script file. + args.add("-f"); + args.add(commandFilename); + + // begin the import in an external process. + LOG.debug("Starting psql with arguments:"); + for (String arg : args) { + LOG.debug(" " + arg); + } + + // This writer will be closed by StreamHandlerFactory. + OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName); + BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); + + // Actually start the psql dump. + p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0])); + + // read from the stdout pipe into the HDFS writer. + InputStream is = p.getInputStream(); + streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters); + + LOG.debug("Starting stream handler"); + counters.startClock(); + streamHandler.processStream(is); + } finally { + // block until the process is done. + LOG.debug("Waiting for process completion"); + int result = 0; + if (null != p) { + while (true) { + try { + result = p.waitFor(); + } catch (InterruptedException ie) { + // interrupted; loop around. + continue; + } + + break; + } + } + + // Remove any password file we wrote + if (null != passwordFilename) { + if (!new File(passwordFilename).delete()) { + LOG.error("Could not remove postgresql password file " + passwordFilename); + LOG.error("You should remove this file to protect your credentials."); + } + } + + if (null != commandFilename) { + // We wrote the COPY comand to a tmpfile. Remove it. + if (!new File(commandFilename).delete()) { + LOG.info("Could not remove temp file: " + commandFilename); + } + } + + // block until the stream handler is done too. + int streamResult = 0; + if (null != streamHandler) { + while (true) { + try { + streamResult = streamHandler.join(); + } catch (InterruptedException ie) { + // interrupted; loop around. + continue; + } + + break; + } + } + + LOG.info("Transfer loop complete."); + + if (0 != result) { + throw new IOException("psql terminated with status " + + Integer.toString(result)); + } + + if (0 != streamResult) { + throw new IOException("Encountered exception in stream handler"); + } + + counters.stopClock(); + LOG.info("Transferred " + counters.toString()); + } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Tue Sep 8 12:52:55 2009 @@ -56,10 +56,15 @@ return this.connection; } + protected boolean hasOpenConnection() { + return this.connection != null; + } + public void close() throws SQLException { super.close(); if (null != this.connection) { this.connection.close(); + this.connection = null; } } Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Tue Sep 8 12:52:55 2009 @@ -42,7 +42,9 @@ import org.apache.hadoop.sqoop.ImportOptions; import org.apache.hadoop.sqoop.lib.FieldFormatter; import org.apache.hadoop.sqoop.lib.RecordParser; +import org.apache.hadoop.sqoop.util.DirectImportUtils; import org.apache.hadoop.sqoop.util.ImportError; +import org.apache.hadoop.sqoop.util.JdbcUrl; import org.apache.hadoop.sqoop.util.PerfCounters; import org.apache.hadoop.sqoop.util.StreamHandlerFactory; import org.apache.hadoop.util.Shell; @@ -348,19 +350,8 @@ String tmpDir = options.getTempDir(); File tempFile = File.createTempFile("mysql-cnf",".cnf", new File(tmpDir)); - // Set this file to be 0600. Java doesn't have a built-in mechanism for this - // so we need to go out to the shell to execute chmod. - ArrayList chmodArgs = new ArrayList(); - chmodArgs.add("chmod"); - chmodArgs.add("0600"); - chmodArgs.add(tempFile.toString()); - try { - Shell.execCommand("chmod", "0600", tempFile.toString()); - } catch (IOException ioe) { - // Shell.execCommand will throw IOException on exit code != 0. - LOG.error("Could not chmod 0600 " + tempFile.toString()); - throw new IOException("Could not ensure password file security.", ioe); - } + // Make the password file only private readable. + DirectImportUtils.setFilePermissions(tempFile, "0600"); // If we're here, the password file is believed to be ours alone. // The inability to set chmod 0600 inside Java is troublesome. We have to trust @@ -399,41 +390,18 @@ // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme // (everything before '://') and replace it with 'http', which we know will work. String connectString = options.getConnectString(); - String databaseName = null; - try { - String sanitizedString = null; - int schemeEndOffset = connectString.indexOf("://"); - if (-1 == schemeEndOffset) { - // couldn't find one? try our best here. - sanitizedString = "http://" + connectString; - LOG.warn("Could not find database access scheme in connect string " + connectString); - } else { - sanitizedString = "http" + connectString.substring(schemeEndOffset); - } - - URL connectUrl = new URL(sanitizedString); - databaseName = connectUrl.getPath(); - } catch (MalformedURLException mue) { - LOG.error("Malformed connect string URL: " + connectString - + "; reason is " + mue.toString()); - } + String databaseName = JdbcUrl.getDatabaseName(connectString); if (null == databaseName) { throw new ImportError("Could not determine database name"); } - // database name was found from the 'path' part of the URL; trim leading '/' - while (databaseName.startsWith("/")) { - databaseName = databaseName.substring(1); - } - LOG.info("Performing import of table " + tableName + " from database " + databaseName); args.add(MYSQL_DUMP_CMD); // requires that this is on the path. String password = options.getPassword(); String passwordFile = null; - Process p = null; StreamHandlerFactory streamHandler = null; PerfCounters counters = new PerfCounters(); @@ -475,27 +443,8 @@ LOG.debug(" " + arg); } - FileSystem fs = FileSystem.get(conf); - String warehouseDir = options.getWarehouseDir(); - Path destDir = null; - if (null != warehouseDir) { - destDir = new Path(new Path(warehouseDir), tableName); - } else { - destDir = new Path(tableName); - } - - LOG.debug("Writing to filesystem: " + conf.get("fs.default.name")); - LOG.debug("Creating destination directory " + destDir); - fs.mkdirs(destDir); - Path destFile = new Path(destDir, "data-00000"); - LOG.debug("Opening output file: " + destFile); - if (fs.exists(destFile)) { - Path canonicalDest = destFile.makeQualified(fs); - throw new IOException("Destination file " + canonicalDest + " already exists"); - } - // This writer will be closed by StreamHandlerFactory. - OutputStream os = fs.create(destFile); + OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName); BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os)); // Actually start the mysqldump. Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=812486&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (added) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Tue Sep 8 12:52:55 2009 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.manager; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.util.ImportError; + +/** + * Manages connections to Postgresql databases + */ +public class PostgresqlManager extends GenericJdbcManager { + + public static final Log LOG = LogFactory.getLog(PostgresqlManager.class.getName()); + + // driver class to ensure is loaded when making db connection. + private static final String DRIVER_CLASS = "org.postgresql.Driver"; + + private static final int POSTGRESQL_FETCH_SIZE = 50; // Fetch 50 rows at a time. + + // set to true after we warn the user that we can use direct fastpath. + private static boolean warningPrinted = false; + + public PostgresqlManager(final ImportOptions opts) { + super(DRIVER_CLASS, opts); + } + + protected PostgresqlManager(final ImportOptions opts, boolean ignored) { + // constructor used by subclasses to avoid the --direct warning. + super(DRIVER_CLASS, opts); + } + + @Override + public void close() throws SQLException { + if (this.hasOpenConnection()) { + this.getConnection().commit(); // Commit any changes made thus far. + } + + super.close(); + } + + @Override + protected String getColNamesQuery(String tableName) { + // Use LIMIT to return fast + return "SELECT t.* FROM " + tableName + " AS t LIMIT 1"; + } + + @Override + public void importTable(String tableName, String jarFile, Configuration conf) + throws IOException, ImportError { + + // The user probably should have requested --direct to invoke pg_dump. + // Display a warning informing them of this fact. + if (!PostgresqlManager.warningPrinted) { + String connectString = options.getConnectString(); + + LOG.warn("It looks like you are importing from postgresql."); + LOG.warn("This transfer can be faster! Use the --direct"); + LOG.warn("option to exercise a postgresql-specific fast path."); + + PostgresqlManager.warningPrinted = true; // don't display this twice. + } + + // Then run the normal importTable() method. + super.importTable(tableName, jarFile, conf); + } + + @Override + public String getPrimaryKey(String tableName) { + // Postgresql stores table names using lower-case internally; need + // to always convert to lowercase before querying the metadata dictionary. + return super.getPrimaryKey(tableName.toLowerCase()); + } + + /** + * Executes an arbitrary SQL statement. Sets the cursor fetch size + * to ensure the entire table is not buffered in RAM before reading + * any rows. A consequence of this is that every ResultSet returned + * by this method *MUST* be close()'d, or read to exhaustion before + * another query can be executed from this ConnManager instance. + * + * @param stmt The SQL statement to execute + * @return A ResultSet encapsulating the results or null on error + */ + protected ResultSet execute(String stmt, Object... args) throws SQLException { + PreparedStatement statement = null; + statement = this.getConnection().prepareStatement(stmt, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + statement.setFetchSize(POSTGRESQL_FETCH_SIZE); + if (null != args) { + for (int i = 0; i < args.length; i++) { + statement.setObject(i + 1, args[i]); + } + } + + LOG.info("Executing SQL statement: " + stmt); + return statement.executeQuery(); + } +} + Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Tue Sep 8 12:52:55 2009 @@ -98,6 +98,7 @@ } finally { try { results.close(); + getConnection().commit(); } catch (SQLException sqlE) { LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); } @@ -146,6 +147,7 @@ } finally { try { results.close(); + getConnection().commit(); } catch (SQLException sqlE) { LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); } @@ -216,6 +218,7 @@ if (null != results) { try { results.close(); + getConnection().commit(); } catch (SQLException sqlE) { LOG.warn("Exception closing ResultSet: " + sqlE.toString()); } @@ -240,6 +243,7 @@ } } finally { results.close(); + getConnection().commit(); } } catch (SQLException sqlException) { LOG.error("Error reading primary key metadata: " + sqlException.toString()); @@ -380,6 +384,7 @@ } finally { try { results.close(); + getConnection().commit(); } catch (SQLException sqlE) { LOG.warn("SQLException closing ResultSet: " + sqlE.toString()); } @@ -412,6 +417,7 @@ // We only use this for metadata queries. Loosest semantics are okay. connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED); + connection.setAutoCommit(false); return connection; } Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=812486&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (added) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Tue Sep 8 12:52:55 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.io.IOException; +import java.io.File; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.util.Shell; + +/** + * Utility methods that are common to various the direct import managers. + */ +public final class DirectImportUtils { + + public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName()); + + private DirectImportUtils() { + } + + /** + * Executes chmod on the specified file, passing in the mode string 'modstr' + * which may be e.g. "a+x" or "0600", etc. + * @throws IOException if chmod failed. + */ + public static void setFilePermissions(File file, String modstr) throws IOException { + // Set this file to be 0600. Java doesn't have a built-in mechanism for this + // so we need to go out to the shell to execute chmod. + try { + Shell.execCommand("chmod", modstr, file.toString()); + } catch (IOException ioe) { + // Shell.execCommand will throw IOException on exit code != 0. + LOG.error("Could not chmod " + modstr + " " + file.toString()); + throw new IOException("Could not ensure password file security.", ioe); + } + } + + /** + * Open a file in HDFS for write to hold the data associated with a table. + * Creates any necessary directories, and returns the OutputStream to write to. + * The caller is responsible for calling the close() method on the returned + * stream. + */ + public static OutputStream createHdfsSink(Configuration conf, ImportOptions options, + String tableName) throws IOException { + + FileSystem fs = FileSystem.get(conf); + String warehouseDir = options.getWarehouseDir(); + Path destDir = null; + if (null != warehouseDir) { + destDir = new Path(new Path(warehouseDir), tableName); + } else { + destDir = new Path(tableName); + } + + LOG.debug("Writing to filesystem: " + conf.get("fs.default.name")); + LOG.debug("Creating destination directory " + destDir); + fs.mkdirs(destDir); + Path destFile = new Path(destDir, "data-00000"); + LOG.debug("Opening output file: " + destFile); + if (fs.exists(destFile)) { + Path canonicalDest = destFile.makeQualified(fs); + throw new IOException("Destination file " + canonicalDest + " already exists"); + } + + // This OutputStream will be clsoed by the caller. + return fs.create(destFile); + } +} + Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java?rev=812486&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java (added) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java Tue Sep 8 12:52:55 2009 @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.util; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Some utilities for parsing JDBC URLs which may not be tolerated + * by Java's java.net.URL class. + * java.net.URL does not support multi:part:scheme:// components, which + * virtually all JDBC connect string URLs have. + */ +public final class JdbcUrl { + + public static final Log LOG = LogFactory.getLog(JdbcUrl.class.getName()); + + private JdbcUrl() { + } + + /** + * @return the database name from the connect string, which is typically the 'path' + * component, or null if we can't. + */ + public static String getDatabaseName(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // couldn't find one? try our best here. + sanitizedString = "http://" + connectString; + LOG.warn("Could not find database access scheme in connect string " + connectString); + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + String databaseName = connectUrl.getPath(); + if (null == databaseName) { + return null; + } + + // This is taken from a 'path' part of a URL, which may have leading '/' + // characters; trim them off. + while (databaseName.startsWith("/")) { + databaseName = databaseName.substring(1); + } + + return databaseName; + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return null; + } + } + + /** + * @return the hostname from the connect string, or null if we can't. + */ + public static String getHostName(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // couldn't find one? ok, then there's no problem, it should work as a URL. + sanitizedString = connectString; + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + return connectUrl.getHost(); + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return null; + } + } + + /** + * @return the port from the connect string, or -1 if we can't. + */ + public static int getPort(String connectString) { + try { + String sanitizedString = null; + int schemeEndOffset = connectString.indexOf("://"); + if (-1 == schemeEndOffset) { + // couldn't find one? ok, then there's no problem, it should work as a URL. + sanitizedString = connectString; + } else { + sanitizedString = "http" + connectString.substring(schemeEndOffset); + } + + URL connectUrl = new URL(sanitizedString); + return connectUrl.getPort(); + } catch (MalformedURLException mue) { + LOG.error("Malformed connect string URL: " + connectString + + "; reason is " + mue.toString()); + return -1; + } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java Tue Sep 8 12:52:55 2009 @@ -25,6 +25,7 @@ import org.apache.hadoop.sqoop.manager.LocalMySQLTest; import org.apache.hadoop.sqoop.manager.MySQLAuthTest; import org.apache.hadoop.sqoop.manager.OracleManagerTest; +import org.apache.hadoop.sqoop.manager.PostgresqlTest; /** * Test battery including all tests of vendor-specific ConnManager implementations. @@ -41,7 +42,8 @@ suite.addTestSuite(LocalMySQLTest.class); suite.addTestSuite(MySQLAuthTest.class); suite.addTestSuite(OracleManagerTest.class); - + suite.addTestSuite(PostgresqlTest.class); + return suite; } Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java Tue Sep 8 12:52:55 2009 @@ -69,7 +69,7 @@ static final String HOST_URL = "jdbc:mysql://localhost/"; static final String MYSQL_DATABASE_NAME = "sqooptestdb"; - static final String TABLE_NAME = "EMPLOYEES"; + static final String TABLE_NAME = "EMPLOYEES_MYSQL"; static final String CONNECT_STRING = HOST_URL + MYSQL_DATABASE_NAME; // instance variables populated during setUp, used during tests Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java?rev=812486&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java (added) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java Tue Sep 8 12:52:55 2009 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.sqoop.manager; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.FileInputStream; +import java.io.File; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.sqoop.ImportOptions; +import org.apache.hadoop.sqoop.testutil.ImportJobTestCase; +import org.apache.hadoop.sqoop.util.FileListing; + +/** + * Test the PostgresqlManager and DirectPostgresqlManager implementations. + * The former uses the postgres JDBC driver to perform an import; + * the latter uses pg_dump to facilitate it. + * + * Since this requires a Postgresql installation on your local machine to use, this + * class is named in such a way that Hadoop's default QA process does not run + * it. You need to run this manually with -Dtestcase=PostgresqlTest. + * + * You need to put Postgresql's JDBC driver library into a location where Hadoop + * can access it (e.g., $HADOOP_HOME/lib). + * + * To configure a postgresql database to allow local connections, put the following + * in /etc/postgresql/8.3/main/pg_hba.conf: + * local all all trust + * host all all 127.0.0.1/32 trust + * + * ... and comment out any other lines referencing 127.0.0.1. + * + * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead. + * You may need to restart the postgresql service after modifying this file. + * + * You should also create a sqooptest user and database: + * + * $ sudo -u postgres psql -U postgres template1 + * template1=> CREATE USER sqooptest; + * template1=> CREATE DATABASE sqooptest; + * template1=> \q + * + */ +public class PostgresqlTest extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog(PostgresqlTest.class.getName()); + + static final String HOST_URL = "jdbc:postgresql://localhost/"; + + static final String DATABASE_USER = "sqooptest"; + static final String DATABASE_NAME = "sqooptest"; + static final String TABLE_NAME = "EMPLOYEES_PG"; + static final String CONNECT_STRING = HOST_URL + DATABASE_NAME; + + @Before + public void setUp() { + LOG.debug("Setting up another postgresql test..."); + + ImportOptions options = new ImportOptions(CONNECT_STRING, TABLE_NAME); + options.setUsername(DATABASE_USER); + + ConnManager manager = null; + Connection connection = null; + Statement st = null; + + try { + manager = new PostgresqlManager(options); + connection = manager.getConnection(); + connection.setAutoCommit(false); + st = connection.createStatement(); + + // create the database table and populate it with data. + + try { + // Try to remove the table first. DROP TABLE IF EXISTS didn't + // get added until pg 8.3, so we just use "DROP TABLE" and ignore + // any exception here if one occurs. + st.executeUpdate("DROP TABLE " + TABLE_NAME); + } catch (SQLException e) { + LOG.info("Couldn't drop table " + TABLE_NAME + " (ok)"); + LOG.info(e.toString()); + // Now we need to reset the transaction. + connection.rollback(); + } + + st.executeUpdate("CREATE TABLE " + TABLE_NAME + " (" + + "id INT NOT NULL PRIMARY KEY, " + + "name VARCHAR(24) NOT NULL, " + + "start_date DATE, " + + "salary FLOAT, " + + "dept VARCHAR(32))"); + + st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + + "1,'Aaron','2009-05-14',1000000.00,'engineering')"); + st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + + "2,'Bob','2009-04-20',400.00,'sales')"); + st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES(" + + "3,'Fred','2009-01-23',15.00,'marketing')"); + connection.commit(); + } catch (SQLException sqlE) { + LOG.error("Encountered SQL Exception: " + sqlE); + sqlE.printStackTrace(); + fail("SQLException when running test setUp(): " + sqlE); + } finally { + try { + if (null != st) { + st.close(); + } + + if (null != manager) { + manager.close(); + } + } catch (SQLException sqlE) { + LOG.warn("Got SQLException when closing connection: " + sqlE); + } + } + + LOG.debug("setUp complete."); + } + + + private String [] getArgv(boolean isDirect) { + ArrayList args = new ArrayList(); + + args.add("-D"); + args.add("fs.default.name=file:///"); + + args.add("--table"); + args.add(TABLE_NAME); + args.add("--warehouse-dir"); + args.add(getWarehouseDir()); + args.add("--connect"); + args.add(CONNECT_STRING); + args.add("--username"); + args.add(DATABASE_USER); + args.add("--where"); + args.add("id > 1"); + + if (isDirect) { + args.add("--direct"); + } + + return args.toArray(new String[0]); + } + + private void doImportAndVerify(boolean isDirect, String [] expectedResults) + throws IOException { + + Path warehousePath = new Path(this.getWarehouseDir()); + Path tablePath = new Path(warehousePath, TABLE_NAME); + + Path filePath; + if (isDirect) { + filePath = new Path(tablePath, "data-00000"); + } else { + filePath = new Path(tablePath, "part-00000"); + } + + File tableFile = new File(tablePath.toString()); + if (tableFile.exists() && tableFile.isDirectory()) { + // remove the directory before running the import. + FileListing.recursiveDeleteDir(tableFile); + } + + String [] argv = getArgv(isDirect); + try { + runImport(argv); + } catch (IOException ioe) { + LOG.error("Got IOException during import: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } + + File f = new File(filePath.toString()); + assertTrue("Could not find imported data file, " + f, f.exists()); + BufferedReader r = null; + try { + // Read through the file and make sure it's all there. + r = new BufferedReader(new InputStreamReader(new FileInputStream(f))); + for (String expectedLine : expectedResults) { + assertEquals(expectedLine, r.readLine()); + } + } catch (IOException ioe) { + LOG.error("Got IOException verifying results: " + ioe.toString()); + ioe.printStackTrace(); + fail(ioe.toString()); + } finally { + IOUtils.closeStream(r); + } + } + + @Test + public void testJdbcBasedImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400.0,sales", + "3,Fred,2009-01-23,15.0,marketing" + }; + + doImportAndVerify(false, expectedResults); + } + + @Test + public void testDirectImport() throws IOException { + String [] expectedResults = { + "2,Bob,2009-04-20,400,sales", + "3,Fred,2009-01-23,15,marketing" + }; + + doImportAndVerify(true, expectedResults); + } +} Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=812486&r1=812485&r2=812486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original) +++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Tue Sep 8 12:52:55 2009 @@ -151,7 +151,9 @@ setCurTableName(null); // clear user-override table name. try { - manager.close(); + if (null != manager) { + manager.close(); + } } catch (SQLException sqlE) { LOG.error("Got SQLException: " + sqlE.toString()); fail("Got SQLException: " + sqlE.toString());