hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r787746 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib...
Date Tue, 23 Jun 2009 16:33:58 GMT
Author: tomwhite
Date: Tue Jun 23 16:33:57 2009
New Revision: 787746

URL: http://svn.apache.org/viewvc?rev=787746&view=rev
Log:
HADOOP-5887. Sqoop should create tables in Hive metastore after importing to HDFS. Contributed by Aaron Kimball.

Added:
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/hive
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q
    hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jun 23 16:33:57 2009
@@ -4,10 +4,13 @@
 
   NEW FEATURES
 
+    HADOOP-5887. Sqoop should create tables in Hive metastore after importing
+    to HDFS. (Aaron Kimball via tomwhite)
+
   IMPROVEMENTS
 
   BUG FIXES
-    HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
+    HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)
 
     HADOOP-6096. Fix Eclipse project and classpath files following project

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/build.xml Tue Jun 23 16:33:57 2009
@@ -70,6 +70,12 @@
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
       <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
 
+      <!-- we have a mock "hive" shell instance in our testdata directory
+           for testing hive integration. Set this property here to ensure
+           that the unit tests pick it up.
+      -->
+      <sysproperty key="hive.home" value="${basedir}/testdata/hive" />
+
       <!-- tools.jar from Sun JDK also required to invoke javac. -->
       <classpath>
         <path refid="test.classpath"/>

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/ImportOptions.java Tue Jun 23 16:33:57 2009
@@ -93,8 +93,9 @@
   private String warehouseDir;
   private FileLayout layout;
   private boolean local; // if true and conn is mysql, use mysqldump.
-
   private String tmpDir; // where temp data goes; usually /tmp
+  private String hiveHome;
+  private boolean hiveImport;
 
   private static final String DEFAULT_CONFIG_FILE = "sqoop.properties";
 
@@ -138,11 +139,17 @@
       this.orderByCol = props.getProperty("db.sort.column", this.orderByCol);
       this.driverClassName = props.getProperty("jdbc.driver", this.driverClassName);
       this.warehouseDir = props.getProperty("hdfs.warehouse.dir", this.warehouseDir);
+      this.hiveHome = props.getProperty("hive.home", this.hiveHome);
 
       String localImport = props.getProperty("local.import",
           Boolean.toString(this.local)).toLowerCase();
       this.local = "true".equals(localImport) || "yes".equals(localImport)
           || "1".equals(localImport);
+
+      String hiveImportStr = props.getProperty("hive.import",
+          Boolean.toString(this.hiveImport)).toLowerCase();
+      this.hiveImport = "true".equals(hiveImportStr) || "yes".equals(hiveImportStr)
+          || "1".equals(hiveImportStr);
     } catch (IOException ioe) {
       LOG.error("Could not read properties file " + DEFAULT_CONFIG_FILE + ": " + ioe.toString());
     } finally {
@@ -156,11 +163,25 @@
     }
   }
 
+  /**
+   * @return the temp directory to use; this is guaranteed to end with
+   * the file separator character (e.g., '/')
+   */
+  public String getTempDir() {
+    return this.tmpDir;
+  }
+
   private void initDefaults() {
     // first, set the true defaults if nothing else happens.
     // default action is to run the full pipeline.
     this.action = ControlAction.FullImport;
     this.hadoopHome = System.getenv("HADOOP_HOME");
+
+    // Set this with $HIVE_HOME, but -Dhive.home can override.
+    this.hiveHome = System.getenv("HIVE_HOME");
+    this.hiveHome = System.getProperty("hive.home", this.hiveHome);
+
+    // Set this to cwd, but -Dsqoop.src.dir can override.
     this.codeOutputDir = System.getProperty("sqoop.src.dir", ".");
 
     String myTmpDir = System.getProperty("test.build.data", "/tmp/");
@@ -193,11 +214,13 @@
     System.out.println("--columns (col,col,col...)   Columns to export from table");
     System.out.println("--order-by (column-name)     Column of the table used to order results");
     System.out.println("--hadoop-home (dir)          Override $HADOOP_HOME");
+    System.out.println("--hive-home (dir)            Override $HIVE_HOME");
     System.out.println("--warehouse-dir (dir)        HDFS path for table destination");
     System.out.println("--as-sequencefile            Imports data to SequenceFiles");
     System.out.println("--as-textfile                Imports data as plain text (default)");
     System.out.println("--all-tables                 Import all tables in database");
     System.out.println("                             (Ignores --table, --columns and --order-by)");
+    System.out.println("--hive-import                If set, then import the table into Hive");
     System.out.println("");
     System.out.println("Code generation options:");
     System.out.println("--outdir (dir)               Output directory for generated code");
@@ -254,6 +277,10 @@
           this.password = args[++i];
         } else if (args[i].equals("--hadoop-home")) {
           this.hadoopHome = args[++i];
+        } else if (args[i].equals("--hive-home")) {
+          this.hiveHome = args[++i];
+        } else if (args[i].equals("--hive-import")) {
+          this.hiveImport = true;
         } else if (args[i].equals("--outdir")) {
           this.codeOutputDir = args[++i];
         } else if (args[i].equals("--as-sequencefile")) {
@@ -358,6 +385,15 @@
     return local;
   }
 
+  public String getHiveHome() {
+    return hiveHome;
+  }
+
+  /** @return true if we should import the table into Hive */
+  public boolean doHiveImport() {
+    return hiveImport;
+  }
+
   /**
    * @return location where .java files go; guaranteed to end with '/'
    */

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/Sqoop.java Tue Jun 23 16:33:57 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import org.apache.hadoop.sqoop.hive.HiveImport;
 import org.apache.hadoop.sqoop.manager.ConnManager;
 import org.apache.hadoop.sqoop.orm.ClassWriter;
 import org.apache.hadoop.sqoop.orm.CompilationManager;
@@ -42,6 +43,7 @@
 
   private ImportOptions options;
   private ConnManager manager;
+  private HiveImport hiveImport;
 
   public Sqoop() {
   }
@@ -69,12 +71,18 @@
     String jarFile = null;
 
     // Generate the ORM code for the tables.
-    // TODO(aaron): Allow this to be bypassed if the user has already generated code
+    // TODO(aaron): Allow this to be bypassed if the user has already generated code,
+    // or if they're using a non-MapReduce import method (e.g., mysqldump).
     jarFile = generateORM(tableName);
 
     if (options.getAction() == ImportOptions.ControlAction.FullImport) {
       // Proceed onward to do the import.
       manager.importTable(tableName, jarFile, getConf());
+
+      // If the user wants this table to be in Hive, perform that post-load.
+      if (options.doHiveImport()) {
+        hiveImport.importTable(tableName);
+      }
     }
   }
 
@@ -101,6 +109,10 @@
       return 1;
     }
 
+    if (options.doHiveImport()) {
+      hiveImport = new HiveImport(options, manager, getConf());
+    }
+
     ImportOptions.ControlAction action = options.getAction();
     if (action == ImportOptions.ControlAction.ListTables) {
       String [] tables = manager.listTables();

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveImport.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.hive;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.util.Executor;
+import org.apache.hadoop.sqoop.util.LoggingStreamHandlerFactory;
+
+/**
+ * Utility to import a table into the Hive metastore. Manages the connection
+ * to Hive itself as well as orchestrating the use of the other classes in this
+ * package.
+ */
+public class HiveImport {
+
+  public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
+
+  private ImportOptions options;
+  private ConnManager connManager;
+  private Configuration configuration;
+
+  public HiveImport(final ImportOptions opts, final ConnManager connMgr, final Configuration conf) {
+    this.options = opts;
+    this.connManager = connMgr;
+    this.configuration = conf;
+  }
+
+
+  /** 
+   * @return the filename of the hive executable to run to do the import
+   */
+  private String getHiveBinPath() {
+    // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
+    // exists.
+    // Fall back to just plain 'hive' and hope it's in the path.
+
+    String hiveHome = options.getHiveHome();
+    if (null == hiveHome) {
+      return "hive";
+    }
+
+    Path p = new Path(hiveHome);
+    p = new Path(p, "bin");
+    p = new Path(p, "hive");
+    String hiveBinStr = p.toString();
+    if (new File(hiveBinStr).exists()) {
+      return hiveBinStr;
+    } else {
+      return "hive";
+    }
+  }
+
+  /**
+   * If we used a MapReduce-based upload of the data, remove the _logs dir
+   * from where we put it, before running Hive LOAD DATA INPATH
+   */
+  private void removeTempLogs(String tableName) throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+    String warehouseDir = options.getWarehouseDir();
+    Path tablePath; 
+    if (warehouseDir != null) {
+      tablePath = new Path(new Path(warehouseDir), tableName);
+    } else {
+      tablePath = new Path(tableName);
+    }
+
+    Path logsPath = new Path(tablePath, "_logs");
+    if (fs.exists(logsPath)) {
+      LOG.info("Removing temporary files from import process: " + logsPath);
+      if (!fs.delete(logsPath, true)) {
+        LOG.warn("Could not delete temporary files; continuing with import, but it may fail.");
+      }
+    }
+  }
+
+  public void importTable(String tableName) throws IOException {
+    removeTempLogs(tableName);
+
+    LOG.info("Loading uploaded data into Hive");
+
+    // For testing purposes against our mock hive implementation, 
+    // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
+    // environment variable for the child hive process. We also disable
+    // timestamp comments so that we have deterministic table creation scripts.
+    String expectedScript = System.getProperty("expected.script");
+    List<String> env = Executor.getCurEnvpStrings();
+    boolean debugMode = expectedScript != null;
+    if (debugMode) {
+      env.add("EXPECTED_SCRIPT=" + expectedScript);
+      env.add("TMPDIR=" + options.getTempDir());
+    }
+
+    // generate the HQL statements to run.
+    TableDefWriter tableWriter = new TableDefWriter(options, connManager, tableName,
+        configuration, !debugMode);
+    String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
+    String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
+
+    // write them to a script file.
+    File tempFile = File.createTempFile("hive-script-",".txt", new File(options.getTempDir()));
+    try {
+      String tmpFilename = tempFile.toString();
+      BufferedWriter w = null;
+      try {
+        FileOutputStream fos = new FileOutputStream(tempFile);
+        w = new BufferedWriter(new OutputStreamWriter(fos));
+        w.write(createTableStr, 0, createTableStr.length());
+        w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
+      } catch (IOException ioe) {
+        LOG.error("Error writing Hive load-in script: " + ioe.toString());
+        ioe.printStackTrace();
+        throw ioe;
+      } finally {
+        if (null != w) {
+          try {
+            w.close();
+          } catch (IOException ioe) {
+            LOG.warn("IOException closing stream to Hive script: " + ioe.toString());
+          }
+        }
+      }
+
+      // run Hive on the script and note the return code.
+      String hiveExec = getHiveBinPath();
+      ArrayList<String> args = new ArrayList<String>();
+      args.add(hiveExec);
+      args.add("-f");
+      args.add(tmpFilename);
+
+      LoggingStreamHandlerFactory lshf = new LoggingStreamHandlerFactory(LOG);
+      int ret = Executor.exec(args.toArray(new String[0]), env.toArray(new String[0]), lshf, lshf);
+      if (0 != ret) {
+        throw new IOException("Hive exited with status " + ret);
+      }
+
+      LOG.info("Hive import complete.");
+    } finally {
+      if (!tempFile.delete()) {
+        LOG.warn("Could not remove temporary file: " + tempFile.toString());
+        // try to delete the file later.
+        tempFile.deleteOnExit();
+      }
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/HiveTypes.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.sql.Types;
+
+/**
+ * Defines conversion between SQL types and Hive types.
+ */
+public class HiveTypes {
+
+  public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
+
+  /**
+   * Given JDBC SQL types coming from another database, what is the best
+   * mapping to a Hive-specific type?
+   */
+  public static String toHiveType(int sqlType) {
+    if (sqlType == Types.INTEGER) {
+      return "INT";
+    } else if (sqlType == Types.VARCHAR) {
+      return "STRING";
+    } else if (sqlType == Types.CHAR) {
+      return "STRING";
+    } else if (sqlType == Types.LONGVARCHAR) {
+      return "STRING";
+    } else if (sqlType == Types.NUMERIC) {
+      // Per suggestion on hive-user, this is converted to DOUBLE for now.
+      return "DOUBLE";
+    } else if (sqlType == Types.DECIMAL) {
+      // Per suggestion on hive-user, this is converted to DOUBLE for now.
+      return "DOUBLE";
+    } else if (sqlType == Types.BIT) {
+      return "BOOLEAN";
+    } else if (sqlType == Types.BOOLEAN) {
+      return "BOOLEAN";
+    } else if (sqlType == Types.TINYINT) {
+      return "TINYINT";
+    } else if (sqlType == Types.SMALLINT) {
+      return "INTEGER";
+    } else if (sqlType == Types.BIGINT) {
+      return "BIGINT";
+    } else if (sqlType == Types.REAL) {
+      return "DOUBLE";
+    } else if (sqlType == Types.FLOAT) {
+      return "DOUBLE";
+    } else if (sqlType == Types.DOUBLE) {
+      return "DOUBLE";
+    } else if (sqlType == Types.DATE) {
+      // unfortunate type coercion
+      return "STRING";
+    } else if (sqlType == Types.TIME) {
+      // unfortunate type coercion
+      return "STRING";
+    } else if (sqlType == Types.TIMESTAMP) {
+      // unfortunate type coercion
+      return "STRING";
+    } else {
+      // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
+      // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
+      return null;
+    }
+  }
+
+  /** 
+   * @return true if a sql type can't be translated to a precise match
+   * in Hive, and we have to cast it to something more generic.
+   */
+  public static boolean isHiveTypeImprovised(int sqlType) {
+    return sqlType == Types.DATE || sqlType == Types.TIME
+        || sqlType == Types.TIMESTAMP
+        || sqlType == Types.DECIMAL
+        || sqlType == Types.NUMERIC;
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.hive.HiveTypes;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Date;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Creates (Hive-specific) SQL DDL statements to create tables to hold data
+ * we're importing from another source.
+ *
+ * After we import the database into HDFS, we can inject it into Hive using
+ * the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
+ */
+public class TableDefWriter {
+
+  public static final Log LOG = LogFactory.getLog(TableDefWriter.class.getName());
+
+  private ImportOptions options;
+  private ConnManager connManager;
+  private Configuration configuration;
+  private String tableName;
+  private boolean commentsEnabled;
+
+  /**
+   * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
+   * @param opts program-wide options
+   * @param connMgr the connection manager used to describe the table.
+   * @param table the name of the table to read.
+   * @param config the Hadoop configuration to use to connect to the dfs
+   * @param withComments if true, then tables will be created with a
+   *        timestamp comment.
+   */
+  public TableDefWriter(final ImportOptions opts, final ConnManager connMgr,
+      final String table, final Configuration config, final boolean withComments) {
+    this.options = opts;
+    this.connManager = connMgr;
+    this.tableName = table;
+    this.configuration = config;
+    this.commentsEnabled = withComments;
+  }
+
+  /**
+   * @return the CREATE TABLE statement for the table to load into hive.
+   */
+  public String getCreateTableStmt() throws IOException {
+    Map<String, Integer> columnTypes = connManager.getColumnTypes(tableName);
+
+    String [] colNames = options.getColumns();
+    if (null == colNames) {
+      colNames = connManager.getColumnNames(tableName);
+    }
+
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("CREATE TABLE " + tableName + " ( ");
+
+    boolean first = true;
+    for (String col : colNames) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      first = false;
+
+      Integer colType = columnTypes.get(col);
+      String hiveColType = HiveTypes.toHiveType(colType);
+      if (null == hiveColType) {
+        throw new IOException("Hive does not support the SQL type for column " + col);  
+      }
+
+      sb.append(col + " " + hiveColType);
+
+      if (HiveTypes.isHiveTypeImprovised(colType)) {
+        LOG.warn("Column " + col + " had to be cast to a less precise type in Hive");
+      }
+    }
+
+    sb.append(") ");
+
+    if (commentsEnabled) {
+      DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+      String curDateStr = dateFormat.format(new Date());
+      sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
+    }
+
+    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ");
+    sb.append("LINES TERMINATED BY '\\n' STORED AS TEXTFILE");
+
+    LOG.debug("Create statement: " + sb.toString());
+    return sb.toString();
+  }
+
+  private static final int DEFAULT_HDFS_PORT =
+      org.apache.hadoop.hdfs.server.namenode.NameNode.DEFAULT_PORT;
+
+  /**
+   * @return the LOAD DATA statement to import the data in HDFS into hive
+   */
+  public String getLoadDataStmt() throws IOException { 
+    String warehouseDir = options.getWarehouseDir();
+    if (null == warehouseDir) {
+      warehouseDir = "";
+    } else if (!warehouseDir.endsWith(File.separator)) {
+      warehouseDir = warehouseDir + File.separator;
+    }
+
+    String tablePath = warehouseDir + tableName;
+    FileSystem fs = FileSystem.get(configuration);
+    Path finalPath = new Path(tablePath).makeQualified(fs);
+    String finalPathStr = finalPath.toString();
+    if (finalPathStr.startsWith("hdfs://") && finalPathStr.indexOf(":", 7) == -1) {
+      // Hadoop removed the port number from the fully-qualified URL.
+      // We need to reinsert this or else Hive will complain.
+      // Do this right before the third instance of the '/' character.
+      int insertPoint = 0;
+      for (int i = 0; i < 3; i++) {
+        insertPoint = finalPathStr.indexOf("/", insertPoint + 1);
+      }
+
+      if (insertPoint == -1) {
+        LOG.warn("Fully-qualified HDFS path does not contain a port.");
+        LOG.warn("this may cause a Hive error.");
+      } else {
+        finalPathStr = finalPathStr.substring(0, insertPoint) + ":" + DEFAULT_HDFS_PORT
+            + finalPathStr.substring(insertPoint, finalPathStr.length());
+      }
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("LOAD DATA INPATH '");
+    sb.append(finalPathStr);
+    sb.append("' INTO TABLE ");
+    sb.append(tableName);
+
+    LOG.debug("Load statement: " + sb.toString());
+    return sb.toString();
+  }
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java Tue Jun 23 16:33:57 2009
@@ -76,13 +76,6 @@
   Connection getConnection() throws SQLException;
 
   /**
-   * Resolve a database-specific type to the Java type that should contain it.
-   * @param sqlType
-   * @return the name of a Java type to hold the sql datatype, or null if none.
-   */
-  String toJavaType(int sqlType);
-
-  /**
    * @return a string identifying the driver class to load for this JDBC connection type.
    */
   String getDriverClass();

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java Tue Jun 23 16:33:57 2009
@@ -261,7 +261,12 @@
     // Or must statement.close() be called too?
   }
 
-  public String toJavaType(int sqlType) {
+  /**
+   * Resolve a database-specific type to the Java type that should contain it.
+   * @param sqlType
+   * @return the name of a Java type to hold the sql datatype, or null if none.
+   */
+  public static String toJavaType(int sqlType) {
     // mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
     if (sqlType == Types.INTEGER) {
       return "Integer";

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java Tue Jun 23 16:33:57 2009
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.sqoop.ImportOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
+import org.apache.hadoop.sqoop.manager.SqlManager;
 import org.apache.hadoop.sqoop.lib.BigDecimalSerializer;
 import org.apache.hadoop.sqoop.lib.JdbcWritableBridge;
 
@@ -247,7 +248,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("Cannot resolve SQL type " + sqlType);
         continue;
@@ -277,7 +278,7 @@
       fieldNum++;
 
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -314,7 +315,7 @@
       fieldNum++;
 
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -347,7 +348,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -380,7 +381,7 @@
     boolean first = true;
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -420,7 +421,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = connManager.toJavaType(sqlType);
+      String javaType = SqlManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/Executor.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Runs a process via Runtime.exec() and allows handling of stdout/stderr to be
+ * deferred to other threads.
+ *
+ */
+public final class Executor {
+  
+  public static final Log LOG = LogFactory.getLog(Executor.class.getName());
+
+  private Executor() {
+  }
+
+  /**
+   * Execute a program defined by the args array with default stream handlers
+   * that consume the program's output (to prevent it from blocking on buffers)
+   * and then ignore said output.
+   */
+  public static int exec(String [] args) throws IOException {
+    NullStreamHandlerFactory f = new NullStreamHandlerFactory();
+    return exec(args, f, f);
+  }
+
+  /**
+   * Run a command via Runtime.exec(), with its stdout and stderr streams
+   * directed to be handled by threads generated by StreamHandlerFactories.
+   * Block until the child process terminates. 
+   *
+   * @return the exit status of the ran program
+   */
+  public static int exec(String [] args, StreamHandlerFactory outHandler,
+      StreamHandlerFactory errHandler) throws IOException {
+    return exec(args, null, outHandler, errHandler);
+  }
+
+
+  /**
+   * Run a command via Runtime.exec(), with its stdout and stderr streams
+   * directed to be handled by threads generated by StreamHandlerFactories.
+   * Block until the child process terminates. Allows the programmer to
+   * specify an environment for the child program.
+   *
+   * @return the exit status of the ran program
+   */
+  public static int exec(String [] args, String [] envp, StreamHandlerFactory outHandler,
+      StreamHandlerFactory errHandler) throws IOException {
+
+    // launch the process.
+    Process p = Runtime.getRuntime().exec(args, envp);
+
+    // dispatch its stdout and stderr to stream handlers if available.
+    if (null != outHandler) {
+      outHandler.processStream(p.getInputStream());
+    } 
+
+    if (null != errHandler) {
+      errHandler.processStream(p.getErrorStream());
+    }
+
+    // wait for the return value.
+    while (true) {
+      try {
+        int ret = p.waitFor();
+        return ret;
+      } catch (InterruptedException ie) {
+        continue;
+      }
+    }
+  }
+
+
+  /**
+   * @return An array formatted correctly for use as an envp based on the
+   * current environment for this program.
+   */
+  public static List<String> getCurEnvpStrings() {
+    Map<String, String> curEnv = System.getenv();
+    ArrayList<String> array = new ArrayList<String>();
+
+    if (null == curEnv) {
+      return null;
+    }
+
+    for (Map.Entry<String, String> entry : curEnv.entrySet()) {
+      array.add(entry.getKey() + "=" + entry.getValue());
+    }
+
+    return array;
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/LoggingStreamHandlerFactory.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A StreamHandlerFactory that takes the contents of a stream and writes
+ * it to log4j.
+ *
+ */
+public class LoggingStreamHandlerFactory implements StreamHandlerFactory {
+
+  public static final Log LOG = LogFactory.getLog(LoggingStreamHandlerFactory.class.getName());
+
+  private Log contextLog;
+
+  public LoggingStreamHandlerFactory(final Log context) {
+    if (null == context) {
+      this.contextLog = LOG;
+    } else {
+      this.contextLog = context;
+    }
+  }
+
+  public void processStream(InputStream is) {
+    new LoggingThread(is).start();
+  }
+
+  /**
+   * Run a background thread that copies the contents of the stream
+   * to the output context log.
+   */
+  private class LoggingThread extends Thread {
+
+    private InputStream stream;
+
+    LoggingThread(final InputStream is) {
+      this.stream = is;
+    }
+
+    public void run() {
+      InputStreamReader isr = new InputStreamReader(this.stream);
+      BufferedReader r = new BufferedReader(isr);
+
+      try {
+        while (true) {
+          String line = r.readLine();
+          if (null == line) {
+            break; // stream was closed by remote end.
+          }
+
+          LoggingStreamHandlerFactory.this.contextLog.info(line);
+        }
+      } catch (IOException ioe) {
+        LOG.error("IOException reading from stream: " + ioe.toString());
+      }
+
+      try {
+        r.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error closing stream in LoggingStreamHandler: " + ioe.toString());
+      }
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/NullStreamHandlerFactory.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A StreamHandlerFactory that takes the contents of a stream and ignores it.
+ *
+ */
+public class NullStreamHandlerFactory implements StreamHandlerFactory {
+
+  public static final Log LOG = LogFactory.getLog(NullStreamHandlerFactory.class.getName());
+
+  public void processStream(InputStream is) {
+    new IgnoringThread(is).start();
+  }
+
+  /**
+   * Run a background thread that reads and ignores the
+   * contents of the stream.
+   */
+  private class IgnoringThread extends Thread {
+
+    private InputStream stream;
+
+    IgnoringThread(final InputStream is) {
+      this.stream = is;
+    }
+
+    public void run() {
+      InputStreamReader isr = new InputStreamReader(this.stream);
+      BufferedReader r = new BufferedReader(isr);
+
+      try {
+        while (true) {
+          String line = r.readLine();
+          if (null == line) {
+            break; // stream was closed by remote end.
+          }
+        }
+      } catch (IOException ioe) {
+        LOG.warn("IOException reading from (ignored) stream: " + ioe.toString());
+      }
+
+      try {
+        r.close();
+      } catch (IOException ioe) {
+        LOG.warn("Error closing stream in NullStreamHandler: " + ioe.toString());
+      }
+    }
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/util/StreamHandlerFactory.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.util;
+
+import java.io.InputStream;
+
+/**
+ * An interface describing a factory class for a Thread class that handles
+ * input from some sort of stream.
+ *
+ * When the stream is closed, the thread should terminate.
+ *
+ */
+public interface StreamHandlerFactory {
+  
+  /**
+   * Create and run a thread to handle input from the provided InputStream.
+   * When processStream returns, the thread should be running; it should
+   * continue to run until the InputStream is exhausted.
+   */
+  void processStream(InputStream is);
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/AllTests.java Tue Jun 23 16:33:57 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.sqoop;
 
+import org.apache.hadoop.sqoop.hive.TestHiveImport;
 import org.apache.hadoop.sqoop.manager.LocalMySQLTest;
 import org.apache.hadoop.sqoop.manager.TestHsqldbManager;
 import org.apache.hadoop.sqoop.manager.TestSqlManager;
@@ -46,6 +47,7 @@
     suite.addTestSuite(TestMultiCols.class);
     suite.addTestSuite(TestOrderBy.class);
     suite.addTestSuite(LocalMySQLTest.class);
+    suite.addTestSuite(TestHiveImport.class);
 
     return suite;
   }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/hive/TestHiveImport.java Tue Jun 23 16:33:57 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.sqoop.hive;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.sqoop.ImportOptions;
+import org.apache.hadoop.sqoop.testutil.HsqldbTestServer;
+import org.apache.hadoop.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test HiveImport capability after an import to HDFS.
+ */
+public class TestHiveImport extends ImportJobTestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestHiveImport.class.getName());
+
+  /**
+   * Create the argv to pass to Sqoop
+   * @return the argv as an array of strings.
+   */
+  private String [] getArgv(boolean includeHadoopFlags) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    if (includeHadoopFlags) {
+      args.add("-D");
+      args.add("mapred.job.tracker=local");
+      args.add("-D");
+      args.add("mapred.map.tasks=1");
+      args.add("-D");
+      args.add("fs.default.name=file:///");
+    }
+
+    args.add("--table");
+    args.add(getTableName());
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(HsqldbTestServer.getUrl());
+    args.add("--hive-import");
+    args.add("--order-by");
+    args.add(getColNames()[0]);
+
+    return args.toArray(new String[0]);
+  }
+
+  private ImportOptions getImportOptions() {
+    ImportOptions opts = new ImportOptions();
+    try {
+      opts.parse(getArgv(false));
+    } catch (ImportOptions.InvalidOptionsException ioe) {
+      fail("Invalid options: " + ioe.toString());
+    }
+
+    return opts;
+  }
+
+  private void runImportTest(String tableName, String [] types, String [] values,
+      String verificationScript) throws IOException {
+
+    // create a table and populate it with a row...
+    setCurTableName(tableName);
+    createTableWithColTypes(types, values);
+    
+    // set up our mock hive shell to compare our generated script
+    // against the correct expected one.
+    ImportOptions options = getImportOptions();
+    String hiveHome = options.getHiveHome();
+    assertNotNull("hive.home was not set", hiveHome);
+    Path testDataPath = new Path(new Path(hiveHome), "scripts/" + verificationScript);
+    System.setProperty("expected.script", testDataPath.toString());
+
+    // verify that we can import it correctly into hive.
+    runImport(getArgv(true));
+  }
+
+  /** Test that strings and ints are handled in the normal fashion */
+  @Test
+  public void testNormalHiveImport() throws IOException {
+    String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+    String [] vals = { "'test'", "42", "'somestring'" };
+    runImportTest("NORMAL_HIVE_IMPORT", types, vals, "normalImport.q");
+  }
+
+  /** Test that dates are coerced properly to strings */
+  @Test
+  public void testDate() throws IOException {
+    String [] types = { "VARCHAR(32)", "DATE" };
+    String [] vals = { "'test'", "'2009-05-12'" };
+    runImportTest("DATE_HIVE_IMPORT", types, vals, "dateImport.q");
+  }
+
+  /** Test that NUMERICs are coerced to doubles */
+  @Test
+  public void testNumeric() throws IOException {
+    String [] types = { "NUMERIC", "CHAR(64)" };
+    String [] vals = { "3.14159", "'foo'" };
+    runImportTest("NUMERIC_HIVE_IMPORT", types, vals, "numericImport.q");
+  }
+
+  /** If bin/hive returns an error exit status, we should get an IOException */
+  @Test
+  public void testHiveExitFails() {
+    // The expected script is different than the one which would be generated
+    // by this, so we expect an IOException out.
+    String [] types = { "NUMERIC", "CHAR(64)" };
+    String [] vals = { "3.14159", "'foo'" };
+    try {
+      runImportTest("FAILING_HIVE_IMPORT", types, vals, "failingImport.q");
+      // If we get here, then the run succeeded -- which is incorrect.
+      fail("FAILING_HIVE_IMPORT test should have thrown IOException");
+    } catch (IOException ioe) {
+      // expected; ok.
+    }
+  }
+
+}
+

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java?rev=787746&r1=787745&r2=787746&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/testutil/ImportJobTestCase.java Tue Jun 23 16:33:57 2009
@@ -46,9 +46,6 @@
 /**
  * Class that implements common methods required for tests which import data
  * from SQL into HDFS and verify correct import.
- *
- * 
- *
  */
 public class ImportJobTestCase extends TestCase {
 
@@ -71,6 +68,13 @@
     LOCAL_WAREHOUSE_DIR = TEMP_BASE_DIR + "sqoop/warehouse";
   }
 
+  // Used if a test manually sets the table name to be used.
+  private String curTableName;
+
+  protected void setCurTableName(String curName) {
+    this.curTableName = curName;
+  }
+
   /**
    * Because of how classloading works, we don't actually want to name
    * all the tables the same thing -- they'll actually just use the same
@@ -83,7 +87,11 @@
   static final String TABLE_NAME = "IMPORT_TABLE_";
 
   protected String getTableName() {
-    return TABLE_NAME + Integer.toString(tableNum);
+    if (null != curTableName) {
+      return curTableName;
+    } else {
+      return TABLE_NAME + Integer.toString(tableNum);
+    }
   }
 
   protected String getWarehouseDir() {
@@ -140,12 +148,15 @@
 
   @After
   public void tearDown() {
+    setCurTableName(null); // clear user-override table name.
+
     try {
       manager.close();
     } catch (SQLException sqlE) {
       LOG.error("Got SQLException: " + sqlE.toString());
       fail("Got SQLException: " + sqlE.toString());
     }
+
   }
 
   static final String BASE_COL_NAME = "DATA_COL";
@@ -385,7 +396,9 @@
     }
 
     // expect a successful return.
-    assertEquals("Failure during job", 0, ret);
+    if (0 != ret) {
+      throw new IOException("Failure during job; return status " + ret);
+    }
   }
 
 }

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/hive
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/hive?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/hive (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/bin/hive Tue Jun 23 16:33:57 2009
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is a mock "Hive" shell that validates whether various test imports
+# succeeded. It accepts commands of the form 'hive -f scriptname'
+# and validates that the script contents match those of an expected script.
+# The filename to that expected script is set via the environment variable
+# EXPECTED_SCRIPT.
+
+# The script will contain a pathname as part of the LOAD DATA INPATH statement;
+# depending on where you run the tests from, this can change. So the expected
+# script file actually contains the marker string "BASEPATH" which is replaced
+# by this script with the contents of $TMPDIR, which is set to 'test.build.data'.
+
+if [ -z "$EXPECTED_SCRIPT" ]; then
+  echo "No expected script set"
+  exit 1
+elif [ -z "$TMPDIR" ]; then
+  TMPDIR=/tmp
+elif [ "$1" != "-f" ]; then
+  echo "Misunderstood argument: $1"
+  echo "Expected '-f'."
+  exit 1
+elif [ -z "$2" ]; then
+  echo "Expected: hive -f filename"
+  exit 1
+else
+  GENERATED_SCRIPT=$2
+fi
+
+# Normalize this to an absolute path
+TMPDIR=`cd $TMPDIR && pwd`
+
+# Copy the expected script into the tmpdir and replace the marker.
+cp "$EXPECTED_SCRIPT" "$TMPDIR"
+SCRIPT_BASE=`basename $EXPECTED_SCRIPT`
+COPIED_SCRIPT="$TMPDIR/$SCRIPT_BASE"
+sed -i -e "s|BASEPATH|$TMPDIR|" $COPIED_SCRIPT
+
+# Actually check to see that the input we got matches up.
+diff --ignore-all-space --ignore-blank-lines "$COPIED_SCRIPT" "$GENERATED_SCRIPT"
+ret=$?
+
+exit $ret
+

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/dateImport.q Tue Jun 23 16:33:57 2009
@@ -0,0 +1,2 @@
+CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
+LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/failingImport.q Tue Jun 23 16:33:57 2009
@@ -0,0 +1,2 @@
+CREATE TABLE DATE_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
+LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/DATE_HIVE_IMPORT' INTO TABLE DATE_HIVE_IMPORT;

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/normalImport.q Tue Jun 23 16:33:57 2009
@@ -0,0 +1,2 @@
+CREATE TABLE NORMAL_HIVE_IMPORT ( DATA_COL0 STRING, DATA_COL1 INT, DATA_COL2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
+LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT' INTO TABLE NORMAL_HIVE_IMPORT;

Added: hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q?rev=787746&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q (added)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/testdata/hive/scripts/numericImport.q Tue Jun 23 16:33:57 2009
@@ -0,0 +1,2 @@
+CREATE TABLE NUMERIC_HIVE_IMPORT ( DATA_COL0 DOUBLE, DATA_COL1 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
+LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NUMERIC_HIVE_IMPORT' INTO TABLE NUMERIC_HIVE_IMPORT;



Mime
View raw message