hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r781330 - in /hadoop/core/trunk: ./ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ src/contrib/sqoop/src/java/org/ap...
Date Wed, 03 Jun 2009 10:22:57 GMT
Author: tomwhite
Date: Wed Jun  3 10:22:56 2009
New Revision: 781330

URL: http://svn.apache.org/viewvc?rev=781330&view=rev
Log:
HADOOP-5844. Use mysqldump when connecting to local mysql instance in Sqoop. Contributed by
Aaron Kimball.

Added:
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
    hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
    hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
    hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jun  3 10:22:56 2009
@@ -131,6 +131,9 @@
     HADOOP-4861. Add disk usage with human-readable size (-duh).
     (Todd Lipcon via tomwhite)
 
+    HADOOP-5844. Use mysqldump when connecting to local mysql instance in Sqoop.
+    (Aaron Kimball via tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
(original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ConnFactory.java
Wed Jun  3 10:22:56 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.manager.GenericJdbcManager;
 import org.apache.hadoop.sqoop.manager.HsqldbManager;
+import org.apache.hadoop.sqoop.manager.LocalMySQLManager;
 import org.apache.hadoop.sqoop.manager.MySQLManager;
 
 import java.io.IOException;
@@ -70,7 +71,11 @@
     }
 
     if (scheme.equals("jdbc:mysql:")) {
-      return new MySQLManager(opts);
+      if (opts.isLocal()) {
+        return new LocalMySQLManager(opts);
+      } else {
+        return new MySQLManager(opts);
+      }
     } else if (scheme.equals("jdbc:hsqldb:hsql:")) {
       return new HsqldbManager(opts);
     } else {

Modified: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
(original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
Wed Jun  3 10:22:56 2009
@@ -92,6 +92,9 @@
   private String driverClassName;
   private String warehouseDir;
   private FileLayout layout;
+  private boolean local; // if true and conn is mysql, use mysqldump.
+
+  private String tmpDir; // where temp data goes; usually /tmp
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
@@ -136,6 +139,10 @@
       this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
       this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
 
+      String localImport = props.getProperty("local.import",
+          Boolean.toString(this.local)).toLowerCase();
+      this.local = "true".equals(localImport) || "yes".equals(localImport)
+          || "1".equals(localImport);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -156,11 +163,12 @@
     this.hadoopHome = System.getenv("HADOOP_HOME");
     this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
 
-    String tmpDir = System.getProperty("test.build.data", "/tmp/");
-    if (!tmpDir.endsWith(File.separator)) {
-      tmpDir = tmpDir + File.separator;
+    String myTmpDir = System.getProperty("test.build.data", "/tmp/");
+    if (!myTmpDir.endsWith(File.separator)) {
+      myTmpDir = myTmpDir + File.separator;
     }
 
+    this.tmpDir = myTmpDir;
     this.jarOutputDir = tmpDir + "sqoop/compile";
     this.layout = FileLayout.TextFile;
 
@@ -178,6 +186,7 @@
     System.out.println("--driver (class-name)        Manually specify JDBC driver class to
use");
     System.out.println("--username (username)        Set authentication username");
     System.out.println("--password (password)        Set authentication password");
+    System.out.println("--local                      Use local import fast path (mysql only)");
     System.out.println("");
     System.out.println("Import control options:");
     System.out.println("--table (tablename)          Table to read");
@@ -232,6 +241,8 @@
           this.action = ControlAction.ListTables;
         } else if (args[i].equals("--all-tables")) {
           this.allTables = true;
+        } else if (args[i].equals("--local")) {
+          this.local = true;
         } else if (args[i].equals("--username")) {
           this.username = args[++i];
           if (null == this.password) {
@@ -300,6 +311,13 @@
     }
   }
 
+  /** get the temporary directory; guaranteed to end in File.separator
+   * (e.g., '/')
+   */
+  public String getTmpDir() {
+    return tmpDir;
+  }
+
   public String getConnectString() {
     return connectString;
   }
@@ -336,6 +354,10 @@
     return password;
   }
 
+  public boolean isLocal() {
+    return local;
+  }
+
   /**
    * @return location where .java files go; guaranteed to end with '/'
    */
@@ -393,4 +415,8 @@
   public FileLayout getFileLayout() {
     return this.layout;
   }
+
+  public void setUsername(String name) {
+    this.username = name;
+  }
 }

Added: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java?rev=781330&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
(added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/LocalMySQLManager.java
Wed Jun  3 10:22:56 2009
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.util.ImportError;
+
+/**
+ * Manages local connections to MySQL databases
+ * that are local to this machine -- so we can use mysqldump to get
+ * really fast dumps.
+ */
+public class LocalMySQLManager extends MySQLManager {
+
+  public static final Log LOG = LogFactory.getLog(LocalMySQLManager.class.getName());
+
+  public LocalMySQLManager(final ImportOptions options) {
+    super(options, false);
+  }
+
+  private static final String MYSQL_DUMP_CMD = "mysqldump";
+  
+  /**
+   * Import the table into HDFS by using mysqldump to pull out the data from
+   * the database and upload the files directly to HDFS.
+   */
+  public void importTable(String tableName, String jarFile, Configuration conf)
+      throws IOException, ImportError {
+
+    LOG.info("Beginning mysqldump fast path import");
+
+    if (options.getFileLayout() != ImportOptions.FileLayout.TextFile) {
+      // TODO(aaron): Support SequenceFile-based load-in
+      LOG.warn("File import layout " + options.getFileLayout()
+          + " is not supported by");
+      LOG.warn("MySQL local import; import will proceed as text files.");
+    }
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    // We need to parse the connect string URI to determine the database
+    // name. Using java.net.URL directly on the connect string will fail because
+    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the scheme
+    // (everything before '://') and replace it with 'http', which we know will work.
+    String connectString = options.getConnectString();
+    String databaseName = null;
+    try {
+      String sanitizedString = null;
+      int schemeEndOffset = connectString.indexOf("://");
+      if (-1 == schemeEndOffset) {
+        // couldn't find one? try our best here.
+        sanitizedString = "http://" + connectString;
+        LOG.warn("Could not find database access scheme in connect string " + connectString);
+      } else {
+        sanitizedString = "http" + connectString.substring(schemeEndOffset);
+      }
+
+      URL connectUrl = new URL(sanitizedString);
+      databaseName = connectUrl.getPath();
+    } catch (MalformedURLException mue) {
+      LOG.error("Malformed connect string URL: " + connectString
+          + "; reason is " + mue.toString());
+    }
+
+    if (null == databaseName) {
+      throw new ImportError("Could not determine database name");
+    }
+
+    // database name was found from the 'path' part of the URL; trim leading '/'
+    while (databaseName.startsWith("/")) {
+      databaseName = databaseName.substring(1);
+    }
+
+    LOG.info("Performing import of table " + tableName + " from database " + databaseName);
+
+    args.add(MYSQL_DUMP_CMD); // requires that this is on the path.
+    args.add("--skip-opt");
+    args.add("--compact");
+    args.add("--no-create-db");
+    args.add("--no-create-info");
+
+    String username = options.getUsername();
+    if (null != username) {
+      args.add("--user=" + username);
+    }
+
+    String password = options.getPassword();
+    if (null != password) {
+      // TODO(aaron): This is really insecure.
+      args.add("--password=" + password);
+    }
+
+    args.add("--quick"); // no buffering
+    // TODO(aaron): Add a flag to allow --lock-tables instead for MyISAM data
+    args.add("--single-transaction"); 
+
+    args.add(databaseName);
+    args.add(tableName);
+
+    Process p = null;
+    try {
+      // begin the import in an external process.
+      LOG.debug("Starting mysqldump with arguments:");
+      for (String arg : args) {
+        LOG.debug("  " + arg);
+      }
+
+      p = Runtime.getRuntime().exec(args.toArray(new String[0]));
+
+      // read from the pipe, into HDFS.
+      InputStream is = p.getInputStream();
+      OutputStream os = null;
+
+      BufferedReader r = null;
+      BufferedWriter w = null;
+
+      try {
+        r = new BufferedReader(new InputStreamReader(is));
+
+        // create the paths/files in HDFS 
+        FileSystem fs = FileSystem.get(conf);
+        String warehouseDir = options.getWarehouseDir();
+        Path destDir = null;
+        if (null != warehouseDir) {
+          destDir = new Path(new Path(warehouseDir), tableName);
+        } else {
+          destDir = new Path(tableName);
+        }
+
+        LOG.debug("Writing to filesystem: " + conf.get("fs.default.name"));
+        LOG.debug("Creating destination directory " + destDir);
+        fs.mkdirs(destDir);
+        Path destFile = new Path(destDir, "data-00000");
+        LOG.debug("Opening output file: " + destFile);
+        if (fs.exists(destFile)) {
+          Path canonicalDest = destFile.makeQualified(fs);
+          throw new IOException("Destination file " + canonicalDest + " already exists");
+        }
+
+        os = fs.create(destFile);
+        w = new BufferedWriter(new OutputStreamWriter(os));
+
+        // Actually do the read/write transfer loop here.
+        int preambleLen = -1; // set to this for "undefined"
+        while (true) {
+          String inLine = r.readLine();
+          if (null == inLine) {
+            break; // EOF.
+          }
+
+          // this line is of the form "INSERT .. VALUES ( actual value text );"
+          // strip the leading preamble up to the '(' and the trailing ');'.
+          if (preambleLen == -1) {
+            // we haven't determined how long the preamble is. It's constant
+            // across all lines, so just figure this out once.
+            String recordStartMark = "VALUES (";
+            preambleLen = inLine.indexOf(recordStartMark) + recordStartMark.length();
+          }
+
+          // chop off the leading and trailing text as we write the
+          // output to HDFS.
+          w.write(inLine, preambleLen, inLine.length() - 2 - preambleLen);
+          w.newLine();
+        }
+      } finally {
+        LOG.info("Transfer loop complete.");
+        if (null != r) {
+          try {
+            r.close();
+          } catch (IOException ioe) {
+            LOG.info("Error closing FIFO stream: " + ioe.toString());
+          }
+        }
+
+        if (null != w) {
+          try {
+            w.close();
+          } catch (IOException ioe) {
+            LOG.info("Error closing HDFS stream: " + ioe.toString());
+          }
+        }
+      }
+    } finally {
+      int result = 0;
+      if (null != p) {
+        while (true) {
+          try {
+            result = p.waitFor();
+          } catch (InterruptedException ie) {
+            // interrupted; loop around.
+            continue;
+          }
+
+          break;
+        }
+      }
+
+      if (0 != result) {
+        throw new IOException("mysqldump terminated with status "
+            + Integer.toString(result));
+      }
+    }
+  }
+}
+

Modified: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
(original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/MySQLManager.java
Wed Jun  3 10:22:56 2009
@@ -41,6 +41,19 @@
 
   public MySQLManager(final ImportOptions opts) {
     super(DRIVER_CLASS, opts);
+
+    String connectString = opts.getConnectString();
+    if (null != connectString && connectString.indexOf("//localhost") != -1) {
+      // if we're not doing a remote connection, they should have a LocalMySQLManager.
+      LOG.warn("It looks like you are importing from mysql on localhost.");
+      LOG.warn("This transfer can be faster! Use the --local option to exercise a");
+      LOG.warn("MySQL-specific fast path.");
+    }
+  }
+
+  protected MySQLManager(final ImportOptions opts, boolean ignored) {
+    // constructor used by subclasses to avoid the --local warning.
+    super(DRIVER_CLASS, opts);
   }
 
   @Override

Modified: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
(original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/mapred/ImportJob.java
Wed Jun  3 10:22:56 2009
@@ -43,9 +43,6 @@
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the sqoop.orm package.
- *
- * 
- *
  */
 public class ImportJob {
 

Modified: hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
(original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/CompilationManager.java
Wed Jun  3 10:22:56 2009
@@ -130,6 +130,15 @@
       }
     }
 
+    // find sqoop jar for compilation classpath
+    String sqoopJar = findThisJar();
+    if (null != sqoopJar) {
+      sqoopJar = File.pathSeparator + sqoopJar;
+    } else {
+      LOG.warn("Could not find sqoop jar; child compilation may fail");
+      sqoopJar = "";
+    }
+
     String curClasspath = System.getProperty("java.class.path");
 
     args.add("-sourcepath");
@@ -140,7 +149,7 @@
     args.add(jarOutDir);
 
     args.add("-classpath");
-    args.add(curClasspath + File.pathSeparator + coreJar);
+    args.add(curClasspath + File.pathSeparator + coreJar + sqoopJar);
 
     // add all the source files
     for (String srcfile : sources) {

Modified: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=781330&r1=781329&r2=781330&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Wed
Jun  3 10:22:56 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.sqoop;
 
+import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
 import org.apache.hadoop.sqoop.orm.TestClassWriter;
@@ -44,6 +45,7 @@
     suite.addTestSuite(TestColumnTypes.class);
     suite.addTestSuite(TestMultiCols.class);
     suite.addTestSuite(TestOrderBy.class);
+    suite.addTestSuite(LocalMySQLTest.class);
 
     return suite;
   }

Added: hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java?rev=781330&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
(added)
+++ hadoop/core/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/LocalMySQLTest.java
Wed Jun  3 10:22:56 2009
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.manager;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test the LocalMySQLManager implementation.
+ * This differs from MySQLManager only in its importTable() method, which
+ * uses mysqldump instead of mapreduce+DBInputFormat.
+ *
+ * Since this requires a MySQL installation on your local machine to use, this
+ * class is named in such a way that Hadoop's default QA process does not run
+ * it. You need to run this manually with -Dtestcase=LocalMySQLTest.
+ *
+ * You need to put MySQL's Connector/J JDBC driver library into a location
+ * where Hadoop will be able to access it (since this library cannot be checked
+ * into Apache's tree for licensing reasons).
+ *
+ * You should also create a database named 'sqooptestdb' and authorize yourself:
+ *
+ * CREATE DATABASE sqooptestdb;
+ * use mysql;
+ * GRANT ALL PRIVILEGES ON sqooptestdb.* TO 'yourusername'@'localhost';
+ * GRANT FILE ON *.* TO 'yourusername'@'localhost';
+ * flush privileges;
+ *
+ * The above will authorize you to use file-level access to the database.
+ * This privilege is global and cannot be applied on a per-schema basis
+ * (e.g., just to sqooptestdb).
+ */
+public class LocalMySQLTest extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(LocalMySQLTest.class.getName());
+
+  static final String MYSQL_DATABASE_NAME = "sqooptestdb";
+  static final String TABLE_NAME = "EMPLOYEES";
+  static final String CONNECT_STRING = "jdbc:mysql://localhost/" + MYSQL_DATABASE_NAME;
+
+  // instance variables populated during setUp, used during tests
+  private LocalMySQLManager manager;
+
+  @Before
+  public void setUp() {
+    ImportOptions options = new ImportOptions(CONNECT_STRING, TABLE_NAME);
+    options.setUsername(getCurrentUser());
+    manager = new LocalMySQLManager(options);
+
+    Connection connection = null;
+    Statement st = null;
+
+    try {
+      connection = manager.getConnection();
+      connection.setAutoCommit(false);
+      st = connection.createStatement();
+
+      // create the database table and populate it with data. 
+      st.executeUpdate("DROP TABLE IF EXISTS " + TABLE_NAME);
+      st.executeUpdate("CREATE TABLE " + TABLE_NAME + " ("
+          + "id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, "
+          + "name VARCHAR(24) NOT NULL, "
+          + "start_date DATE, "
+          + "salary FLOAT, "
+          + "dept VARCHAR(32))");
+
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "NULL,'Aaron','2009-05-14',1000000.00,'engineering')");
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "NULL,'Bob','2009-04-20',400.00,'sales')");
+      st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
+          + "NULL,'Fred','2009-01-23',15.00,'marketing')");
+      connection.commit();
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when running test setUp(): " + sqlE);
+    } finally {
+      try {
+        if (null != st) {
+          st.close();
+        }
+
+        if (null != connection) {
+          connection.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("Got SQLException when closing connection: " + sqlE);
+      }
+    }
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      manager.close();
+    } catch (SQLException sqlE) {
+      LOG.error("Got SQLException: " + sqlE.toString());
+      fail("Got SQLException: " + sqlE.toString());
+    }
+  }
+
+  /** @return the current username. */
+  private String getCurrentUser() {
+    // First, check the $USER environment variable.
+    String envUser = System.getenv("USER");
+    if (null != envUser) {
+      return envUser;
+    }
+
+    // Try `whoami`
+    String [] whoamiArgs = new String[1];
+    whoamiArgs[0] = "whoami";
+    Process p = null;
+    BufferedReader r = null;
+    try {
+      p = Runtime.getRuntime().exec(whoamiArgs);
+      InputStream is = p.getInputStream();
+      r = new BufferedReader(new InputStreamReader(is));
+      return r.readLine();
+    } catch (IOException ioe) {
+      LOG.error("IOException reading from `whoami`: " + ioe.toString());
+      return null;
+    } finally {
+      // close our stream.
+      if (null != r) {
+        try {
+          r.close();
+        } catch (IOException ioe) {
+          LOG.warn("IOException closing input stream from `whoami`: " + ioe.toString());
+        }
+      }
+
+      // wait for whoami to exit.
+      while (p != null) {
+        try {
+          int ret = p.waitFor();
+          if (0 != ret) {
+            LOG.error("whoami exited with error status " + ret);
+            // suppress original return value from this method.
+            return null; 
+          }
+        } catch (InterruptedException ie) {
+          continue; // loop around.
+        }
+      }
+    }
+  }
+
+  private String [] getArgv(boolean includeHadoopFlags) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      args.add("-D");
+      args.add("fs.default.name=file:///");
+    }
+
+    args.add("--table");
+    args.add(TABLE_NAME);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(CONNECT_STRING);
+    args.add("--local");
+    args.add("--username");
+    args.add(getCurrentUser());
+
+    return args.toArray(new String[0]);
+  }
+
+  @Test
+  public void testLocalBulkImport() {
+    String [] argv = getArgv(true);
+    try {
+      runImport(argv);
+    } catch (IOException ioe) {
+      LOG.error("Got IOException during import: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    }
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path tablePath = new Path(warehousePath, TABLE_NAME);
+    Path filePath = new Path(tablePath, "data-00000");
+
+    File f = new File(filePath.toString());
+    assertTrue("Could not find imported data file", f.exists());
+    BufferedReader r = null;
+    try {
+      // Read through the file and make sure it's all there.
+      r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
+      assertEquals("1,'Aaron','2009-05-14',1e+06,'engineering'", r.readLine());
+      assertEquals("2,'Bob','2009-04-20',400,'sales'", r.readLine());
+      assertEquals("3,'Fred','2009-01-23',15,'marketing'", r.readLine());
+    } catch (IOException ioe) {
+      LOG.error("Got IOException verifying results: " + ioe.toString());
+      ioe.printStackTrace();
+      fail(ioe.toString());
+    } finally {
+      IOUtils.closeStream(r);
+    }
+  }
+}



Mime
View raw message