hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r812486 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/test/org...
Date Tue, 08 Sep 2009 12:52:55 GMT
Author: tomwhite
Date: Tue Sep  8 12:52:55 2009
New Revision: 812486

URL: http://svn.apache.org/viewvc?rev=812486&view=rev
Log:
MAPREDUCE-938. Postgresql support for Sqoop. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep  8 12:52:55 2009
@@ -91,6 +91,8 @@
     logs and generating job traces for simulation and analysis. (Dick King via
     cdouglas)
 
+    MAPREDUCE-938. Postgresql support for Sqoop. (Aaron Kimball via tomwhite)	
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DefaultManagerFactory.java Tue Sep  8 12:52:55 2009
@@ -81,6 +81,12 @@
       } else {
         return new MySQLManager(options);
       }
+    } else if (scheme.equals("jdbc:postgresql:")) {
+      if (options.isDirect()) {
+        return new DirectPostgresqlManager(options);
+      } else {
+        return new PostgresqlManager(options);
+      }
     } else if (scheme.startsWith("jdbc:hsqldb:")) {
       return new HsqldbManager(options);
     } else if (scheme.startsWith("jdbc:oracle:")) {

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=812486&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java Tue Sep  8 12:52:55 2009
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.util.DirectImportUtils;
+import org.apache.hadoop.sqoop.util.Executor;
+import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.JdbcUrl;
+import org.apache.hadoop.sqoop.util.PerfCounters;
+import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
+
+/**
+ * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
+ * commands.
+ */
+public class DirectPostgresqlManager extends PostgresqlManager {
+  public static final Log LOG = LogFactory.getLog(DirectPostgresqlManager.class.getName());
+
+  public DirectPostgresqlManager(final ImportOptions opts) {
+    // Inform superclass that we're overriding import method via alt. constructor.
+    super(opts, true);
+  }
+
+  private static final String PSQL_CMD = "psql";
+
+  /** Copies data directly into HDFS, adding the user's chosen line terminator
+      char to each record.
+    */
+  static class PostgresqlStreamHandlerFactory implements StreamHandlerFactory {
+    private final BufferedWriter writer;
+    private final PerfCounters counters;
+    private final ImportOptions options;
+
+    PostgresqlStreamHandlerFactory(final BufferedWriter w, final ImportOptions opts,
+        final PerfCounters ctrs) {
+      this.writer = w;
+      this.options = opts;
+      this.counters = ctrs;
+    }
+
+    private PostgresqlStreamThread child;
+
+    public void processStream(InputStream is) {
+      child = new PostgresqlStreamThread(is, writer, options, counters);
+      child.start();
+    }
+
+    public int join() throws InterruptedException {
+      child.join();
+      if (child.isErrored()) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
+    private static class PostgresqlStreamThread extends Thread {
+      public static final Log LOG = LogFactory.getLog(PostgresqlStreamThread.class.getName());
+
+      private final BufferedWriter writer;
+      private final InputStream stream;
+      private final ImportOptions options;
+      private final PerfCounters counters;
+
+      private boolean error;
+
+      PostgresqlStreamThread(final InputStream is, final BufferedWriter w,
+          final ImportOptions opts, final PerfCounters ctrs) {
+        this.stream = is;
+        this.writer = w;
+        this.options = opts;
+        this.counters = ctrs;
+      }
+
+      public boolean isErrored() {
+        return error;
+      }
+
+      public void run() {
+        BufferedReader r = null;
+        BufferedWriter w = this.writer;
+
+        char recordDelim = this.options.getOutputRecordDelim();
+
+        try {
+          r = new BufferedReader(new InputStreamReader(this.stream));
+
+          // read/write transfer loop here.
+          while (true) {
+            String inLine = r.readLine();
+            if (null == inLine) {
+              break; // EOF
+            }
+
+            w.write(inLine);
+            w.write(recordDelim);
+            counters.addBytes(1 + inLine.length());
+          }
+        } catch (IOException ioe) {
+          LOG.error("IOException reading from psql: " + ioe.toString());
+          // set the error bit so our caller can see that something went wrong.
+          error = true;
+        } finally {
+          if (null != r) {
+            try {
+              r.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing FIFO stream: " + ioe.toString());
+            }
+          }
+
+          if (null != w) {
+            try {
+              w.close();
+            } catch (IOException ioe) {
+              LOG.info("Error closing HDFS stream: " + ioe.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+    Takes a list of columns and turns them into a string like "col1, col2, col3..."
+   */
+  private String getColumnListStr(String [] cols) {
+    if (null == cols) {
+      return null;
+    }
+
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String col : cols) {
+      if (!first) {
+        sb.append(", ");
+      }
+      sb.append(col);
+      first = false;
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * @return the Postgresql-specific SQL command to copy the table ("COPY .... TO STDOUT")
+   */
+  private String getCopyCommand(String tableName) {
+    /*
+       Format of this command is:
+
+          COPY table(col, col....) TO STDOUT 
+      or  COPY ( query ) TO STDOUT
+        WITH DELIMITER 'fieldsep'
+        CSV
+        QUOTE 'quotechar'
+        ESCAPE 'escapechar'
+        FORCE QUOTE col, col, col....
+    */
+
+    StringBuilder sb = new StringBuilder();
+    String [] cols = getColumnNames(tableName);
+
+    sb.append("COPY ");
+    String whereClause = this.options.getWhereClause();
+    if (whereClause != null && whereClause.length() > 0) {
+      // Import from a SELECT QUERY
+      sb.append("(");
+      sb.append("SELECT ");
+      if (null != cols) {
+        sb.append(getColumnListStr(cols));
+      } else {
+        sb.append("*");
+      }
+
+      sb.append(" FROM ");
+      sb.append(tableName);
+      sb.append(" WHERE ");
+      sb.append(whereClause);
+      sb.append(")");
+    } else {
+      // Import just the table.
+      sb.append(tableName);
+      if (null != cols) {
+        // specify columns.
+        sb.append("(");
+        sb.append(getColumnListStr(cols));
+        sb.append(")");
+      }
+    }
+
+    // Translate delimiter characters to '\ooo' octal representation.
+    sb.append(" TO STDOUT WITH DELIMITER E'\\");
+    sb.append(Integer.toString((int) this.options.getOutputFieldDelim(), 8));
+    sb.append("' CSV ");
+    if (this.options.getOutputEnclosedBy() != '\0') {
+      sb.append("QUOTE E'\\");
+      sb.append(Integer.toString((int) this.options.getOutputEnclosedBy(), 8));
+      sb.append("' ");
+    }
+    if (this.options.getOutputEscapedBy() != '\0') {
+      sb.append("ESCAPE E'\\");
+      sb.append(Integer.toString((int) this.options.getOutputEscapedBy(), 8));
+      sb.append("' ");
+    }
+
+    // add the "FORCE QUOTE col, col, col..." clause if quotes are required.
+    if (null != cols && this.options.isOutputEncloseRequired()) {
+      sb.append("FORCE QUOTE ");
+      sb.append(getColumnListStr(cols));
+    }
+
+    sb.append(";");
+
+    String copyCmd = sb.toString();
+    LOG.debug("Copy command is " + copyCmd);
+    return copyCmd;
+  }
+
+  /** Write the COPY command to a temp file
+    * @return the filename we wrote to.
+    */
+  private String writeCopyCommand(String command) throws IOException {
+    String tmpDir = options.getTempDir();
+    File tempFile = File.createTempFile("tmp-",".sql", new File(tmpDir));
+    BufferedWriter w = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(tempFile)));
+    w.write(command);
+    w.newLine();
+    w.close();
+    return tempFile.toString();
+  }
+
+  /** Write the user's password to a file that is chmod 0600.
+      @return the filename.
+    */
+  private String writePasswordFile(String password) throws IOException {
+
+    String tmpDir = options.getTempDir();
+    File tempFile = File.createTempFile("pgpass",".pgpass", new File(tmpDir));
+    LOG.debug("Writing password to tempfile: " + tempFile);
+
+    // Make sure it's only readable by the current user.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
+
+    // Actually write the password data into the file.
+    BufferedWriter w = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(tempFile)));
+    w.write("*:*:*:*:" + password);
+    w.close();
+    return tempFile.toString();
+  }
+
+  /** @return true if someHost refers to localhost.
+   */
+  private boolean isLocalhost(String someHost) {
+    if (null == someHost) {
+      return false;
+    }
+
+    try {
+      InetAddress localHostAddr = InetAddress.getLocalHost();
+      InetAddress someAddr = InetAddress.getByName(someHost);
+
+      return localHostAddr.equals(someAddr);
+    } catch (UnknownHostException uhe) {
+      return false;
+    }
+  }
+
+  @Override
+  /**
+   * Import the table into HDFS by using psql to pull the data out of the db
+   * via COPY FILE TO STDOUT.
+   */
+  public void importTable(String tableName, String jarFile, Configuration conf)
+    throws IOException, ImportError {
+
+    LOG.info("Beginning psql fast path import");
+
+    if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
+      // TODO(aaron): Support SequenceFile-based load-in
+      LOG.warn("File import layout" + options.getFileLayout()
+          + " is not supported by");
+      LOG.warn("Postgresql direct import; import will proceed as text files.");
+    }
+
+    String commandFilename = null;
+    String passwordFilename = null;
+    Process p = null;
+    StreamHandlerFactory streamHandler = null;
+    PerfCounters counters = new PerfCounters();
+
+    try {
+      // Get the COPY TABLE command to issue, write this to a file, and pass it
+      // in to psql with -f filename.
+      // Then make sure we delete this file in our finally block.
+      String copyCmd = getCopyCommand(tableName);
+      commandFilename = writeCopyCommand(copyCmd);
+
+      // Arguments to pass to psql on the command line.
+      ArrayList<String> args = new ArrayList<String>();
+
+      // Environment to pass to psql.
+      List<String> envp = Executor.getCurEnvpStrings();
+
+      // We need to parse the connect string URI to determine the database name
+      // and the host and port. If the host is localhost and the port is not specified,
+      // we don't want to pass this to psql, because we want to force the use of a
+      // UNIX domain socket, not a TCP/IP socket.
+      String connectString = options.getConnectString();
+      String databaseName = JdbcUrl.getDatabaseName(connectString);
+      String hostname = JdbcUrl.getHostName(connectString);
+      int port = JdbcUrl.getPort(connectString);
+
+      if (null == databaseName) {
+        throw new ImportError("Could not determine database name");
+      }
+
+      LOG.info("Performing import of table " + tableName + " from database " + databaseName);
+      args.add(PSQL_CMD); // requires that this is on the path.
+      args.add("--tuples-only");
+      args.add("--quiet");
+
+      String username = options.getUsername();
+      if (username != null) {
+        args.add("--username");
+        args.add(username);
+        String password = options.getPassword();
+        if (null != password) {
+          passwordFilename = writePasswordFile(password);
+          // Need to send PGPASSFILE environment variable specifying
+          // location of our postgres file.
+          envp.add("PGPASSFILE=" + passwordFilename);
+        }
+      }
+
+      if (!isLocalhost(hostname) || port != -1) {
+        args.add("--host");
+        args.add(hostname);
+        args.add("--port");
+        args.add(Integer.toString(port));
+      }
+
+      if (null != databaseName && databaseName.length() > 0) {
+        args.add(databaseName);
+      }
+
+      // The COPY command is in a script file.
+      args.add("-f");
+      args.add(commandFilename);
+
+      // begin the import in an external process.
+      LOG.debug("Starting psql with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+
+      // This writer will be closed by StreamHandlerFactory.
+      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
+      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+      // Actually start the psql dump.
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]), envp.toArray(new String[0]));
+
+      // read from the stdout pipe into the HDFS writer.
+      InputStream is = p.getInputStream();
+      streamHandler = new PostgresqlStreamHandlerFactory(w, options, counters);
+
+      LOG.debug("Starting stream handler");
+      counters.startClock();
+      streamHandler.processStream(is);
+    } finally {
+      // block until the process is done.
+      LOG.debug("Waiting for process completion");
+      int result = 0;
+      if (null != p) {
+        while (true) {
+          try {
+            result = p.waitFor();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      // Remove any password file we wrote
+      if (null != passwordFilename) {
+        if (!new File(passwordFilename).delete()) {
+          LOG.error("Could not remove postgresql password file " + passwordFilename);
+          LOG.error("You should remove this file to protect your credentials.");
+        }
+      }
+
+      if (null != commandFilename) {
+        // We wrote the COPY comand to a tmpfile. Remove it.
+        if (!new File(commandFilename).delete()) {
+          LOG.info("Could not remove temp file: " + commandFilename);
+        }
+      }
+
+      // block until the stream handler is done too.
+      int streamResult = 0;
+      if (null != streamHandler) {
+        while (true) {
+          try {
+            streamResult = streamHandler.join();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      LOG.info("Transfer loop complete.");
+
+      if (0 != result) {
+        throw new IOException("psql terminated with status "
+            + Integer.toString(result));
+      }
+
+      if (0 != streamResult) {
+        throw new IOException("Encountered exception in stream handler");
+      }
+
+      counters.stopClock();
+      LOG.info("Transferred " + counters.toString());
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/GenericJdbcManager.java Tue Sep  8 12:52:55 2009
@@ -56,10 +56,15 @@
     return this.connection;
   }
 
+  protected boolean hasOpenConnection() {
+    return this.connection != null;
+  }
+
   public void close() throws SQLException {
     super.close();
     if (null != this.connection) {
       this.connection.close();
+      this.connection = null;
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java Tue Sep  8 12:52:55 2009
@@ -42,7 +42,9 @@
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.ImportError;
+import org.apache.hadoop.sqoop.util.JdbcUrl;
 import org.apache.hadoop.sqoop.util.PerfCounters;
 import org.apache.hadoop.sqoop.util.StreamHandlerFactory;
 import org.apache.hadoop.util.Shell;
@@ -348,19 +350,8 @@
     String tmpDir = options.getTempDir();
     File tempFile = File.createTempFile("mysql-cnf",".cnf", new File(tmpDir));
 
-    // Set this file to be 0600. Java doesn't have a built-in mechanism for this
-    // so we need to go out to the shell to execute chmod.
-    ArrayList<String> chmodArgs = new ArrayList<String>();
-    chmodArgs.add("chmod");
-    chmodArgs.add("0600");
-    chmodArgs.add(tempFile.toString());
-    try {
-      Shell.execCommand("chmod", "0600", tempFile.toString());
-    } catch (IOException ioe) {
-      // Shell.execCommand will throw IOException on exit code != 0.
-      LOG.error("Could not chmod 0600 " + tempFile.toString());
-      throw new IOException("Could not ensure password file security.", ioe);
-    }
+    // Make the password file only private readable.
+    DirectImportUtils.setFilePermissions(tempFile, "0600");
 
     // If we're here, the password file is believed to be ours alone.
     // The inability to set chmod 0600 inside Java is troublesome. We have to trust
@@ -399,41 +390,18 @@
     // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme
     // (everything before '://') and replace it with 'http', which we know will work.
     String connectString = options.getConnectString();
-    String databaseName = null;
-    try {
-      String sanitizedString = null;
-      int schemeEndOffset = connectString.indexOf("://");
-      if (-1 == schemeEndOffset) {
-        // couldn't find one? try our best here.
-        sanitizedString = "http://" + connectString;
-        LOG.warn("Could not find database access scheme in connect string " + connectString);
-      } else {
-        sanitizedString = "http" + connectString.substring(schemeEndOffset);
-      }
-
-      URL connectUrl = new URL(sanitizedString);
-      databaseName = connectUrl.getPath();
-    } catch (MalformedURLException mue) {
-      LOG.error("Malformed connect string URL: " + connectString
-          + "; reason is " + mue.toString());
-    }
+    String databaseName = JdbcUrl.getDatabaseName(connectString);
 
     if (null == databaseName) {
       throw new ImportError("Could not determine database name");
     }
 
-    // database name was found from the 'path' part of the URL; trim leading '/'
-    while (databaseName.startsWith("/")) {
-      databaseName = databaseName.substring(1);
-    }
-
     LOG.info("Performing import of table " + tableName + " from database " + databaseName);
     args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
 
     String password = options.getPassword();
     String passwordFile = null;
 
-    
     Process p = null;
     StreamHandlerFactory streamHandler = null;
     PerfCounters counters = new PerfCounters();
@@ -475,27 +443,8 @@
         LOG.debug("  " + arg);
       }
 
-      FileSystem fs = FileSystem.get(conf);
-      String warehouseDir = options.getWarehouseDir();
-      Path destDir = null;
-      if (null != warehouseDir) {
-        destDir = new Path(new Path(warehouseDir), tableName);
-      } else {
-        destDir = new Path(tableName);
-      }
-
-      LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
-      LOG.debug("Creating destination directory " + destDir);
-      fs.mkdirs(destDir);
-      Path destFile = new Path(destDir, "data-00000");
-      LOG.debug("Opening output file: " + destFile);
-      if (fs.exists(destFile)) {
-        Path canonicalDest = destFile.makeQualified(fs);
-        throw new IOException("Destination file " + canonicalDest + " already exists");
-      }
-
       // This writer will be closed by StreamHandlerFactory.
-      OutputStream os = fs.create(destFile);
+      OutputStream os = DirectImportUtils.createHdfsSink(conf, options, tableName);
       BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
 
       // Actually start the mysqldump.

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java?rev=812486&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/PostgresqlManager.java Tue Sep  8 12:52:55 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.util.ImportError;
+
+/**
+ * Manages connections to Postgresql databases
+ */
+public class PostgresqlManager extends GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(PostgresqlManager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "org.postgresql.Driver";
+
+  private static final int POSTGRESQL_FETCH_SIZE = 50; // Fetch 50 rows at a time.
+
+  // set to true after we warn the user that we can use direct fastpath.
+  private static boolean warningPrinted = false;
+
+  public PostgresqlManager(final ImportOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  protected PostgresqlManager(final ImportOptions opts, boolean ignored) {
+    // constructor used by subclasses to avoid the --direct warning.
+    super(DRIVER_CLASS, opts);
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (this.hasOpenConnection()) {
+      this.getConnection().commit(); // Commit any changes made thus far.
+    }
+
+    super.close();
+  }
+
+  @Override
+  protected String getColNamesQuery(String tableName) {
+    // Use LIMIT to return fast
+    return "SELECT t.* FROM " + tableName + " AS t LIMIT 1";
+  }
+
+  @Override
+  public void importTable(String tableName, String jarFile, Configuration conf)
+        throws IOException, ImportError {
+
+    // The user probably should have requested --direct to invoke pg_dump.
+    // Display a warning informing them of this fact.
+    if (!PostgresqlManager.warningPrinted) {
+      String connectString = options.getConnectString();
+
+      LOG.warn("It looks like you are importing from postgresql.");
+      LOG.warn("This transfer can be faster! Use the --direct");
+      LOG.warn("option to exercise a postgresql-specific fast path.");
+
+      PostgresqlManager.warningPrinted = true; // don't display this twice.
+    }
+
+    // Then run the normal importTable() method.
+    super.importTable(tableName, jarFile, conf);
+  }
+
+  @Override
+  public String getPrimaryKey(String tableName) {
+    // Postgresql stores table names using lower-case internally; need
+    // to always convert to lowercase before querying the metadata dictionary.
+    return super.getPrimaryKey(tableName.toLowerCase());
+  }
+
+  /**
+   * Executes an arbitrary SQL statement. Sets the cursor fetch size
+   * to ensure the entire table is not buffered in RAM before reading
+   * any rows. A consequence of this is that every ResultSet returned
+   * by this method *MUST* be close()'d, or read to exhaustion before
+   * another query can be executed from this ConnManager instance.
+   *
+   * @param stmt The SQL statement to execute
+   * @return A ResultSet encapsulating the results or null on error
+   */
+  protected ResultSet execute(String stmt, Object... args) throws SQLException {
+    PreparedStatement statement = null;
+    statement = this.getConnection().prepareStatement(stmt,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    statement.setFetchSize(POSTGRESQL_FETCH_SIZE);
+    if (null != args) {
+      for (int i = 0; i < args.length; i++) {
+        statement.setObject(i + 1, args[i]);
+      }
+    }
+
+    LOG.info("Executing SQL statement: " + stmt);
+    return statement.executeQuery();
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Tue Sep  8 12:52:55 2009
@@ -98,6 +98,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) {
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -146,6 +147,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) { 
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -216,6 +218,7 @@
       if (null != results) {
         try {
           results.close();
+          getConnection().commit();
         } catch (SQLException sqlE) {
           LOG.warn("Exception closing ResultSet: " + sqlE.toString());
         }
@@ -240,6 +243,7 @@
         }
       } finally {
         results.close();
+        getConnection().commit();
       }
     } catch (SQLException sqlException) {
       LOG.error("Error reading primary key metadata: " + sqlException.toString());
@@ -380,6 +384,7 @@
     } finally {
       try {
         results.close();
+        getConnection().commit();
       } catch (SQLException sqlE) {
         LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
       }
@@ -412,6 +417,7 @@
 
     // We only use this for metadata queries. Loosest semantics are okay.
     connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
+    connection.setAutoCommit(false);
 
     return connection;
   }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=812486&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java Tue Sep  8 12:52:55 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.util.Shell;
+
+/**
+ * Utility methods that are common to various the direct import managers.
+ */
+public final class DirectImportUtils {
+
+  public static final Log LOG = LogFactory.getLog(DirectImportUtils.class.getName());
+
+  private DirectImportUtils() {
+  }
+
+  /**
+   * Executes chmod on the specified file, passing in the mode string 'modstr'
+   * which may be e.g. "a+x" or "0600", etc.
+   * @throws IOException if chmod failed.
+   */
+  public static void setFilePermissions(File file, String modstr) throws IOException {
+    // Set this file to be 0600. Java doesn't have a built-in mechanism for this
+    // so we need to go out to the shell to execute chmod.
+    try {
+      Shell.execCommand("chmod", modstr, file.toString());
+    } catch (IOException ioe) {
+      // Shell.execCommand will throw IOException on exit code != 0.
+      LOG.error("Could not chmod " + modstr + " " + file.toString());
+      throw new IOException("Could not ensure password file security.", ioe);
+    }
+  }
+
+  /**
+   * Open a file in HDFS for write to hold the data associated with a table.
+   * Creates any necessary directories, and returns the OutputStream to write to.
+   * The caller is responsible for calling the close() method on the returned
+   * stream.
+   */
+  public static OutputStream createHdfsSink(Configuration conf, ImportOptions options,
+      String tableName) throws IOException {
+
+    FileSystem fs = FileSystem.get(conf);
+    String warehouseDir = options.getWarehouseDir();
+    Path destDir = null;
+    if (null != warehouseDir) {
+      destDir = new Path(new Path(warehouseDir), tableName);
+    } else {
+      destDir = new Path(tableName);
+    }
+
+    LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
+    LOG.debug("Creating destination directory " + destDir);
+    fs.mkdirs(destDir);
+    Path destFile = new Path(destDir, "data-00000");
+    LOG.debug("Opening output file: " + destFile);
+    if (fs.exists(destFile)) {
+      Path canonicalDest = destFile.makeQualified(fs);
+      throw new IOException("Destination file " + canonicalDest + " already exists");
+    }
+
+    // This OutputStream will be clsoed by the caller.
+    return fs.create(destFile);
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java?rev=812486&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/JdbcUrl.java Tue Sep  8 12:52:55 2009
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Some utilities for parsing JDBC URLs which may not be tolerated
+ * by Java's java.net.URL class.
+ * java.net.URL does not support multi:part:scheme:// components, which
+ * virtually all JDBC connect string URLs have.
+ */
+public final class JdbcUrl {
+
+  public static final Log LOG = LogFactory.getLog(JdbcUrl.class.getName());
+
+  private JdbcUrl() {
+  }
+
+  /**
+   * @return the database name from the connect string, which is typically the 'path'
+   * component, or null if we can't.
+   */
+  public static String getDatabaseName(String connectString) {
+    try {
+      String sanitizedString = null;
+      int schemeEndOffset = connectString.indexOf("://");
+      if (-1 == schemeEndOffset) {
+        // couldn't find one? try our best here.
+        sanitizedString = "http://" + connectString;
+        LOG.warn("Could not find database access scheme in connect string " + connectString);
+      } else {
+        sanitizedString = "http" + connectString.substring(schemeEndOffset);
+      }
+
+      URL connectUrl = new URL(sanitizedString);
+      String databaseName = connectUrl.getPath();
+      if (null == databaseName) {
+        return null;
+      }
+
+      // This is taken from a 'path' part of a URL, which may have leading '/'
+      // characters; trim them off.
+      while (databaseName.startsWith("/")) {
+        databaseName = databaseName.substring(1);
+      }
+
+      return databaseName;
+    } catch (MalformedURLException mue) {
+      LOG.error("Malformed connect string URL: " + connectString
+          + "; reason is " + mue.toString());
+      return null;
+    }
+  }
+
+  /**
+   * @return the hostname from the connect string, or null if we can't.
+   */
+  public static String getHostName(String connectString) {
+    try {
+      String sanitizedString = null;
+      int schemeEndOffset = connectString.indexOf("://");
+      if (-1 == schemeEndOffset) {
+        // couldn't find one? ok, then there's no problem, it should work as a URL.
+        sanitizedString = connectString;
+      } else {
+        sanitizedString = "http" + connectString.substring(schemeEndOffset);
+      }
+
+      URL connectUrl = new URL(sanitizedString);
+      return connectUrl.getHost();
+    } catch (MalformedURLException mue) {
+      LOG.error("Malformed connect string URL: " + connectString
+          + "; reason is " + mue.toString());
+      return null;
+    }
+  }
+
+  /**
+   * @return the port from the connect string, or -1 if we can't.
+   */
+  public static int getPort(String connectString) {
+    try {
+      String sanitizedString = null;
+      int schemeEndOffset = connectString.indexOf("://");
+      if (-1 == schemeEndOffset) {
+        // couldn't find one? ok, then there's no problem, it should work as a URL.
+        sanitizedString = connectString;
+      } else {
+        sanitizedString = "http" + connectString.substring(schemeEndOffset);
+      }
+
+      URL connectUrl = new URL(sanitizedString);
+      return connectUrl.getPort();
+    } catch (MalformedURLException mue) {
+      LOG.error("Malformed connect string URL: " + connectString
+          + "; reason is " + mue.toString());
+      return -1;
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/ThirdPartyTests.java Tue Sep  8 12:52:55 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.MySQLAuthTest;
 import org.apache.hadoop.sqoop.manager.OracleManagerTest;
+import org.apache.hadoop.sqoop.manager.PostgresqlTest;
 
 /**
  * Test battery including all tests of vendor-specific ConnManager implementations.
@@ -41,7 +42,8 @@
     suite.addTestSuite(LocalMySQLTest.class);
     suite.addTestSuite(MySQLAuthTest.class);
     suite.addTestSuite(OracleManagerTest.class);
-    
+    suite.addTestSuite(PostgresqlTest.class);
+
     return suite;
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java Tue Sep  8 12:52:55 2009
@@ -69,7 +69,7 @@
   static final String HOST_URL = "jdbc:mysql://localhost/";
 
   static final String MYSQL_DATABASE_NAME = "sqooptestdb";
-  static final String TABLE_NAME = "EMPLOYEES";
+  static final String TABLE_NAME = "EMPLOYEES_MYSQL";
   static final String CONNECT_STRING = HOST_URL + MYSQL_DATABASE_NAME;
 
   // instance variables populated during setUp, used during tests

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java?rev=812486&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/PostgresqlTest.java Tue Sep  8 12:52:55 2009
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+import org.apache.hadoop.sqoop.util.FileListing;
+
+/**
+ * Test the PostgresqlManager and DirectPostgresqlManager implementations.
+ * The former uses the postgres JDBC driver to perform an import;
+ * the latter uses pg_dump to facilitate it.
+ *
+ * Since this requires a Postgresql installation on your local machine to use, this
+ * class is named in such a way that Hadoop's default QA process does not run
+ * it. You need to run this manually with -Dtestcase=PostgresqlTest.
+ *
+ * You need to put Postgresql's JDBC driver library into a location where Hadoop
+ * can access it (e.g., $HADOOP_HOME/lib).
+ *
+ * To configure a postgresql database to allow local connections, put the following
+ * in /etc/postgresql/8.3/main/pg_hba.conf:
+ *     local  all all trust
+ *     host all all 127.0.0.1/32 trust
+ * 
+ * ... and comment out any other lines referencing 127.0.0.1.
+ *
+ * For postgresql 8.1, this may be in /var/lib/pgsql/data, instead.
+ * You may need to restart the postgresql service after modifying this file.
+ *
+ * You should also create a sqooptest user and database:
+ *
+ * $ sudo -u postgres psql -U postgres template1
+ * template1=> CREATE USER sqooptest;
+ * template1=> CREATE DATABASE sqooptest;
+ * template1=> \q
+ *
+ */
+public class PostgresqlTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(PostgresqlTest.class.getName());
+
+  static final String HOST_URL = "jdbc:postgresql://localhost/";
+
+  static final String DATABASE_USER = "sqooptest";
+  static final String DATABASE_NAME = "sqooptest";
+  static final String TABLE_NAME = "EMPLOYEES_PG";
+  static final String CONNECT_STRING = HOST_URL + DATABASE_NAME;
+
+  @Before
+  public void setUp() {
+    LOG.debug("Setting up another postgresql test...");
+
+    ImportOptions options = new ImportOptions(CONNECT_STRING, TABLE_NAME);
+    options.setUsername(DATABASE_USER);
+
+    ConnManager manager = null;
+    Connection connection = null;
+    Statement st = null;
+
+    try {
+      manager = new PostgresqlManager(options);
+      connection = manager.getConnection();
+      connection.setAutoCommit(false);
+      st = connection.createStatement();
+
+      // create the database table and populate it with data. 
+
+      try {
+        // Try to remove the table first. DROP TABLE IF EXISTS didn't
+        // get added until pg 8.3, so we just use "DROP TABLE" and ignore
+        // any exception here if one occurs.
+        st.executeUpdate("DROP TABLE " + TABLE_NAME);
+      } catch (SQLException e) {
+        LOG.info("Couldn't drop table " + TABLE_NAME + " (ok)");
+        LOG.info(e.toString());
+        // Now we need to reset the transaction.
+        connection.rollback();
+      }
+
+      st.executeUpdate("CREATE TABLE " + TABLE_NAME + " ("
+          + "id INT NOT NULL PRIMARY KEY, "
+          + "name VARCHAR(24) NOT NULL, "
+          + "start_date DATE, "
+          + "salary FLOAT, "
+          + "dept VARCHAR(32))");
+
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "1,'Aaron','2009-05-14',1000000.00,'engineering')");
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "2,'Bob','2009-04-20',400.00,'sales')");
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "3,'Fred','2009-01-23',15.00,'marketing')");
+      connection.commit();
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when running test setUp(): " + sqlE);
+    } finally {
+      try {
+        if (null != st) {
+          st.close();
+        }
+
+        if (null != manager) {
+          manager.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("Got SQLException when closing connection: " + sqlE);
+      }
+    }
+
+    LOG.debug("setUp complete.");
+  }
+
+
+  private String [] getArgv(boolean isDirect) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    args.add("-D");
+    args.add("fs.default.name=file:///");
+
+    args.add("--table");
+    args.add(TABLE_NAME);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(CONNECT_STRING);
+    args.add("--username");
+    args.add(DATABASE_USER);
+    args.add("--where");
+    args.add("id > 1");
+
+    if (isDirect) {
+      args.add("--direct");
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  private void doImportAndVerify(boolean isDirect, String [] expectedResults)
+      throws IOException {
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path tablePath = new Path(warehousePath, TABLE_NAME);
+
+    Path filePath;
+    if (isDirect) {
+      filePath = new Path(tablePath, "data-00000");
+    } else {
+      filePath = new Path(tablePath, "part-00000");
+    }
+
+    File tableFile = new File(tablePath.toString());
+    if (tableFile.exists() && tableFile.isDirectory()) {
+      // remove the directory before running the import.
+      FileListing.recursiveDeleteDir(tableFile);
+    }
+
+    String [] argv = getArgv(isDirect);
+    try {
+      runImport(argv);
+    } catch (IOException ioe) {
+      LOG.error("Got IOException during import: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    }
+
+    File f = new File(filePath.toString());
+    assertTrue("Could not find imported data file, " + f, f.exists());
+    BufferedReader r = null;
+    try {
+      // Read through the file and make sure it's all there.
+      r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+      for (String expectedLine : expectedResults) {
+        assertEquals(expectedLine, r.readLine());
+      }
+    } catch (IOException ioe) {
+      LOG.error("Got IOException verifying results: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    } finally {
+      IOUtils.closeStream(r);
+    }
+  }
+
+  @Test
+  public void testJdbcBasedImport() throws IOException {
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400.0,sales",
+        "3,Fred,2009-01-23,15.0,marketing"
+    };
+
+    doImportAndVerify(false, expectedResults);
+  }
+
+  @Test
+  public void testDirectImport() throws IOException {
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400,sales",
+        "3,Fred,2009-01-23,15,marketing"
+    };
+
+    doImportAndVerify(true, expectedResults);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=812486&r1=812485&r2=812486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Tue Sep  8 12:52:55 2009
@@ -151,7 +151,9 @@
     setCurTableName(null); // clear user-override table name.
 
     try {
-      manager.close();
+      if (null != manager) {
+        manager.close();
+      }
     } catch (SQLException sqlE) {
       LOG.error("Got SQLException: " + sqlE.toString());
       fail("Got SQLException: " + sqlE.toString());



Mime
View raw message