hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r883452 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/doc/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/ src/con...
Date Mon, 23 Nov 2009 18:37:46 GMT
Author: tomwhite
Date: Mon Nov 23 18:37:27 2009
New Revision: 883452

URL: http://svn.apache.org/viewvc?rev=883452&view=rev
Log:
MAPREDUCE-1169. Improvements to mysqldump use in Sqoop. Contributed by Aaron Kimball.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Nov 23 18:37:27 2009
@@ -48,6 +48,9 @@
     MAPREDUCE-1167. ProcfsBasedProcessTree collects rss memory information.
     (Scott Chen via dhruba)
 
+    MAPREDUCE-1169. Improvements to mysqldump use in Sqoop.
+    (Aaron Kimball via tomwhite)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/Sqoop-manpage.txt Mon Nov 23 18:37:27 2009
@@ -163,6 +163,14 @@
 --list-tables::
   List tables in database and exit
 
+Database-specific options
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional arguments may be passed to the database manager
+after a lone '-' on the command-line.
+
+In MySQL direct mode, additional arguments are passed directly to
+mysqldump.
 
 ENVIRONMENT
 -----------

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/doc/direct.txt Mon Nov 23 18:37:27 2009
@@ -18,16 +18,15 @@
 
 
 Direct-mode Imports
-------------------
+-------------------
 
 While the JDBC-based import method used by Sqoop provides it with the
 ability to read from a variety of databases using a generic driver, it
 is not the most high-performance method available. Sqoop can read from
 certain database systems faster by using their built-in export tools.
 
-For example, Sqoop can read from a local MySQL database by using the +mysqldump+
-tool distributed with MySQL. If you run Sqoop on the same machine where a
-MySQL database is present, you can take advantage of this faster
+For example, Sqoop can read from a MySQL database by using the +mysqldump+
+tool distributed with MySQL. You can take advantage of this faster
 import method by running Sqoop with the +--direct+ argument. This
 combined with a connect string that begins with +jdbc:mysql://+ will
 inform Sqoop that it should select the faster access method.
@@ -58,4 +57,21 @@
 subsequent MapReduce programs to use multiple mappers across your data
 in parallel.
 
+Tool-specific arguments
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Sqoop will generate a set of command-line arguments with which it invokes
+the underlying direct-mode tool (e.g., mysqldump). You can specify additional
+arguments which should be passed to the tool by passing them to Sqoop
+after a single '+-+' argument. e.g.:
+
+----
+$ sqoop --connect jdbc:mysql://localhost/db --table foo --direct - --lock-tables
+----
+
+The +--lock-tables+ argument (and anything else to the right of the +-+ argument)
+will be passed directly to mysqldump.
+
+
+
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
Mon Nov 23 18:37:27 2009
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -124,6 +125,8 @@
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
+  private String [] extraArgs;
+
   public ImportOptions() {
     initDefaults();
   }
@@ -254,6 +257,8 @@
 
     this.conf = new Configuration();
 
+    this.extraArgs = null;
+
     loadFromProperties();
   }
 
@@ -327,6 +332,10 @@
     System.out.println("--list-databases             List all databases available and exit");
     System.out.println("--debug-sql (statement)      Execute 'statement' in SQL and exit");
     System.out.println("");
+    System.out.println("Database-specific options:");
+    System.out.println("Arguments may be passed to the database manager after a lone '-':");
+    System.out.println("  MySQL direct mode: arguments passed directly to mysqldump");
+    System.out.println("");
     System.out.println("Generic Hadoop command-line options:");
     ToolRunner.printGenericCommandUsage(System.out);
     System.out.println("");
@@ -546,6 +555,13 @@
         } else if (args[i].equals("--help")) {
           printUsage();
           throw new InvalidOptionsException("");
+        } else if (args[i].equals("-")) {
+          // Everything after a '--' goes into extraArgs.
+          ArrayList<String> extra = new ArrayList<String>();
+          for (i++; i < args.length; i++) {
+            extra.add(args[i]);
+          }
+          this.extraArgs = extra.toArray(new String[0]);
         } else {
           throw new InvalidOptionsException("Invalid argument: " + args[i] + ".\n"
               + "Try --help for usage.");
@@ -882,4 +898,19 @@
   public void setConf(Configuration config) {
     this.conf = config;
   }
+
+  /**
+   * @return command-line arguments after a '-'
+   */
+  public String [] getExtraArgs() {
+    if (extraArgs == null) {
+      return null;
+    }
+
+    String [] out = new String[extraArgs.length];
+    for (int i = 0; i < extraArgs.length; i++) {
+      out[i] = extraArgs[i];
+    }
+    return out;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/DirectPostgresqlManager.java
Mon Nov 23 18:37:27 2009
@@ -27,8 +27,6 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -38,14 +36,15 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
 import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.Executor;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct dumps from Postgresql databases via psql COPY TO STDOUT
@@ -273,23 +272,6 @@
     return tempFile.toString();
   }
 
-  /** @return true if someHost refers to localhost.
-   */
-  private boolean isLocalhost(String someHost) {
-    if (null == someHost) {
-      return false;
-    }
-
-    try {
-      InetAddress localHostAddr = InetAddress.getLocalHost();
-      InetAddress someAddr = InetAddress.getByName(someHost);
-
-      return localHostAddr.equals(someAddr);
-    } catch (UnknownHostException uhe) {
-      return false;
-    }
-  }
-
   @Override
   /**
    * Import the table into HDFS by using psql to pull the data out of the db
@@ -315,6 +297,7 @@
     String passwordFilename = null;
     Process p = null;
     AsyncSink sink = null;
+    AsyncSink errSink = null;
     PerfCounters counters = new PerfCounters();
 
     try {
@@ -361,7 +344,7 @@
         }
       }
 
-      if (!isLocalhost(hostname) || port != -1) {
+      if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
         args.add("--host");
         args.add(hostname);
         args.add("--port");
@@ -397,6 +380,8 @@
       LOG.debug("Starting stream sink");
       counters.startClock();
       sink.processStream(is);
+      errSink = new LoggingAsyncSink(LOG);
+      errSink.processStream(p.getErrorStream());
     } finally {
       // block until the process is done.
       LOG.debug("Waiting for process completion");
@@ -444,6 +429,18 @@
         }
       }
 
+      // Attempt to block for stderr stream sink; errors are advisory.
+      if (null != errSink) {
+        try {
+          if (0 != errSink.join()) {
+            LOG.info("Encountered exception reading stderr stream");
+          }
+        } catch (InterruptedException ie) {
+          LOG.info("Thread interrupted waiting for stderr to complete: "
+              + ie.toString());
+        }
+      }
+
       LOG.info("Transfer loop complete.");
 
       if (0 != result) {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
Mon Nov 23 18:37:27 2009
@@ -38,13 +38,14 @@
 import org.apache.hadoop.sqoop.io.SplittableBufferedWriter;
 import org.apache.hadoop.sqoop.lib.FieldFormatter;
 import org.apache.hadoop.sqoop.lib.RecordParser;
+import org.apache.hadoop.sqoop.util.AsyncSink;
 import org.apache.hadoop.sqoop.util.DirectImportUtils;
 import org.apache.hadoop.sqoop.util.ErrorableAsyncSink;
 import org.apache.hadoop.sqoop.util.ErrorableThread;
 import org.apache.hadoop.sqoop.util.ImportError;
 import org.apache.hadoop.sqoop.util.JdbcUrl;
+import org.apache.hadoop.sqoop.util.LoggingAsyncSink;
 import org.apache.hadoop.sqoop.util.PerfCounters;
-import org.apache.hadoop.sqoop.util.AsyncSink;
 
 /**
  * Manages direct connections to MySQL databases
@@ -365,6 +366,8 @@
     // (everything before '://') and replace it with 'http', which we know will work.
     String connectString = options.getConnectString();
     String databaseName = JdbcUrl.getDatabaseName(connectString);
+    String hostname = JdbcUrl.getHostName(connectString);
+    int port = JdbcUrl.getPort(connectString);
 
     if (null == databaseName) {
       throw new ImportError("Could not determine database name");
@@ -378,6 +381,7 @@
 
     Process p = null;
     AsyncSink sink = null;
+    AsyncSink errSink = null;
     PerfCounters counters = new PerfCounters();
     try {
       // --defaults-file must be the first argument.
@@ -394,20 +398,31 @@
         args.add(whereClause);
       }
 
+      if (!DirectImportUtils.isLocalhost(hostname) || port != -1) {
+        args.add("--host=" + hostname);
+        args.add("--port=" + Integer.toString(port));
+      }
+
       args.add("--skip-opt");
       args.add("--compact");
       args.add("--no-create-db");
       args.add("--no-create-info");
       args.add("--quick"); // no buffering
-      // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
       args.add("--single-transaction"); 
-      // TODO(aaron): Add --host and --port arguments to support remote direct connects.
 
       String username = options.getUsername();
       if (null != username) {
         args.add("--user=" + username);
       }
 
+      // If the user supplied extra args, add them here.
+      String [] extra = options.getExtraArgs();
+      if (null != extra) {
+        for (String arg : extra) {
+          args.add(arg);
+        }
+      }
+
       args.add(databaseName);
       args.add(tableName);
 
@@ -442,6 +457,10 @@
       // Start an async thread to read and upload the whole stream.
       counters.startClock();
       sink.processStream(is);
+
+      // Start an async thread to send stderr to log4j.
+      errSink = new LoggingAsyncSink(LOG);
+      errSink.processStream(p.getErrorStream());
     } finally {
 
       // block until the process is done.
@@ -482,6 +501,18 @@
         }
       }
 
+      // Try to wait for stderr to finish, but regard any errors as advisory.
+      if (null != errSink) {
+        try {
+          if (0 != errSink.join()) {
+            LOG.info("Encountered exception reading stderr stream");
+          }
+        } catch (InterruptedException ie) {
+          LOG.info("Thread interrupted waiting for stderr to complete: "
+              + ie.toString());
+        }
+      }
+
       LOG.info("Transfer loop complete.");
 
       if (0 != result) {

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/DirectImportUtils.java
Mon Nov 23 18:37:27 2009
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -88,5 +90,22 @@
         new SplittingOutputStream(conf, destDir, "data-",
         options.getDirectSplitSize(), options.shouldUseCompression()));
   }
+
+  /** @return true if someHost refers to localhost.
+   */
+  public static boolean isLocalhost(String someHost) {
+    if (null == someHost) {
+      return false;
+    }
+
+    try {
+      InetAddress localHostAddr = InetAddress.getLocalHost();
+      InetAddress someAddr = InetAddress.getByName(someHost);
+
+      return localHostAddr.equals(someAddr);
+    } catch (UnknownHostException uhe) {
+      return false;
+    }
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=883452&r1=883451&r2=883452&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
Mon Nov 23 18:37:27 2009
@@ -181,7 +181,7 @@
     }
   }
 
-  private String [] getArgv(boolean mysqlOutputDelims) {
+  private String [] getArgv(boolean mysqlOutputDelims, String... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();
 
     args.add("-D");
@@ -205,11 +205,17 @@
       args.add("--mysql-delimiters");
     }
 
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        args.add(arg);
+      }
+    }
+
     return args.toArray(new String[0]);
   }
 
-  private void doLocalBulkImport(boolean mysqlOutputDelims, String [] expectedResults)
-      throws IOException {
+  private void doLocalBulkImport(boolean mysqlOutputDelims,
+      String [] expectedResults, String [] extraArgs) throws IOException {
 
     Path warehousePath = new Path(this.getWarehouseDir());
     Path tablePath = new Path(warehousePath, TABLE_NAME);
@@ -221,7 +227,7 @@
       FileListing.recursiveDeleteDir(tableFile);
     }
 
-    String [] argv = getArgv(mysqlOutputDelims);
+    String [] argv = getArgv(mysqlOutputDelims, extraArgs);
     try {
       runImport(argv);
     } catch (IOException ioe) {
@@ -256,7 +262,20 @@
         "3,Fred,2009-01-23,15,marketing"
     };
 
-    doLocalBulkImport(false, expectedResults);
+    doLocalBulkImport(false, expectedResults, null);
+  }
+
+  @Test
+  public void testWithExtraParams() throws IOException {
+    // no quoting of strings allowed.
+    String [] expectedResults = {
+        "2,Bob,2009-04-20,400,sales",
+        "3,Fred,2009-01-23,15,marketing"
+    };
+
+    String [] extraArgs = { "-", "--lock-tables" };
+
+    doLocalBulkImport(false, expectedResults, extraArgs);
   }
 
   @Test
@@ -267,6 +286,6 @@
         "3,'Fred','2009-01-23',15,'marketing'"
     };
 
-    doLocalBulkImport(true, expectedResults);
+    doLocalBulkImport(true, expectedResults, null);
   }
 }



Mime
View raw message