hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r902696 - in /hadoop/mapreduce/trunk: ./ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ src/contrib/sqoop/src/test...
Date Mon, 25 Jan 2010 03:40:54 GMT
Author: cdouglas
Date: Mon Jan 25 03:40:53 2010
New Revision: 902696

URL: http://svn.apache.org/viewvc?rev=902696&view=rev
Log:
MAPREDUCE-1327. Fix Sqoop handling of Oracle timezone with timestamp data
types in import. Contributed by Leonid Furman

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
    hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jan 25 03:40:53 2010
@@ -244,6 +244,9 @@
     MAPREDUCE-1313. Fix NPE in Sqoop when table with null fields uses escape
     during import. (Aaron Kimball via cdouglas)
 
+    MAPREDUCE-1327. Fix Sqoop handling of Oracle timezone with timestamp data
+    types in import. (Leonid Furman via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/hive/TableDefWriter.java
Mon Jan 25 03:40:53 2010
@@ -24,7 +24,6 @@
 
 import org.apache.hadoop.sqoop.SqoopOptions;
 import org.apache.hadoop.sqoop.manager.ConnManager;
-import org.apache.hadoop.sqoop.hive.HiveTypes;
 
 import java.io.File;
 import java.io.IOException;
@@ -95,9 +94,9 @@
       first = false;
 
       Integer colType = columnTypes.get(col);
-      String hiveColType = HiveTypes.toHiveType(colType);
+      String hiveColType = connManager.toHiveType(colType);
       if (null == hiveColType) {
-        throw new IOException("Hive does not support the SQL type for column " + col);  
+        throw new IOException("Hive does not support the SQL type for column " + col);
       }
 
       sb.append(col + " " + hiveColType);

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/ConnManager.java
Mon Jan 25 03:40:53 2010
@@ -57,6 +57,20 @@
   public abstract String getPrimaryKey(String tableName);
 
   /**
+   * Return java type for SQL type
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  public abstract String toJavaType(int sqlType);
+
+    /**
+     * Return hive type for SQL type
+     * @param sqlType   sql type
+     * @return          hive type
+     */
+  public abstract String toHiveType(int sqlType);
+
+  /**
    * Return an unordered mapping from colname to sqltype for
    * all columns in a table.
    *

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/OracleManager.java
Mon Jan 25 03:40:53 2010
@@ -23,7 +23,10 @@
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
+import java.util.TimeZone;
+import java.lang.reflect.Method;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -83,10 +86,52 @@
     // We only use this for metadata queries. Loosest semantics are okay.
     connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
 
+    // Setting session time zone
+    setSessionTimeZone(connection);
+
     return connection;
   }
 
   /**
+   * Set session time zone
+   * @param conn      Connection object
+   * @throws          SQLException instance
+   */
+  private void setSessionTimeZone(Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on the OracleConnection
class
+    // because oracle specific java libraries are not accessible in this context
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(),
ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java
+    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
+    String clientTimeZone = TimeZone.getDefault().getID();
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZone);
+      LOG.info("Time zone has been set");
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZone +
+               " could not be set on oracle database.");
+      LOG.info("Setting default time zone: UTC");
+      try {
+        method.invoke(conn, "UTC");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
+
+  /**
    * This importTable() implementation continues to use the older DBInputFormat
    * because DataDrivenDBInputFormat does not currently work with Oracle.
    */
@@ -111,5 +156,117 @@
 
     importer.runImport(tableName, jarFile, splitCol, options.getConf());
   }
+
+  /**
+   * Resolve a database-specific type to the Java type that should contain it.
+   * @param sqlType
+   * @return the name of a Java type to hold the sql datatype, or null if none.
+   */
+  public String toJavaType(int sqlType) {
+    String defaultJavaType = super.toJavaType(sqlType);
+    return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
+  }
+
+  /**
+   * Attempt to map sql type to java type
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  private String dbToJavaType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "java.sql.Timestamp";
+    }
+
+    // return null if no java type was found for sqlType
+    return null;
+  }
+    
+  /**
+   * Attempt to map sql type to hive type
+   * @param sqlType     sql data type
+   * @return            hive data type
+   */
+  public String toHiveType(int sqlType) {
+    String defaultHiveType = super.toHiveType(sqlType);
+    return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
+  }
+
+  /**
+   * Resolve a database-specific type to Hive type
+   * @param sqlType     sql type
+   * @return            hive type
+   */
+  private String dbToHiveType(int sqlType) {
+    // load class oracle.jdbc.OracleTypes
+    // need to use reflection because oracle specific libraries
+    // are not accessible in this context
+    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
+
+    // check if it is TIMESTAMPTZ
+    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // check if it is TIMESTAMPLTZ
+    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
+    if (sqlType == dbType) {
+      return "STRING";
+    }
+
+    // return null if no hive type was found for sqlType
+    return null;
+  }
+
+  /**
+   * Get database type
+   * @param clazz         oracle class representing sql types
+   * @param fieldName     field name
+   * @return              value of database type constant
+   */
+  private int getDatabaseType(Class clazz, String fieldName) {
+    // need to use reflection to extract constant values
+    // because the database specific java libraries are not accessible in this context
+    int value = -1;
+    try {
+      java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
+      value = field.getInt(null);
+    } catch (NoSuchFieldException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    } catch (IllegalAccessException ex) {
+      LOG.error("Could not retrieve value for field " + fieldName, ex);
+    }
+    return value;
+  }
+
+  /**
+   * Load class by name
+   * @param className     class name
+   * @return              class instance
+   */
+  private Class getTypeClass(String className) {
+    // need to use reflection to load class
+    // because the database specific java libraries are not accessible in this context
+    Class typeClass = null;
+    try {
+      typeClass = Class.forName(className);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load class " + className, ex);
+    }
+    return typeClass;
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/manager/SqlManager.java
Mon Jan 25 03:40:53 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.sqoop.manager;
 
 import org.apache.hadoop.sqoop.SqoopOptions;
+import org.apache.hadoop.sqoop.hive.HiveTypes;
 import org.apache.hadoop.sqoop.mapreduce.DataDrivenImportJob;
 import org.apache.hadoop.sqoop.mapreduce.ExportJob;
 import org.apache.hadoop.sqoop.util.ExportException;
@@ -308,7 +309,7 @@
    * @param sqlType
    * @return the name of a Java type to hold the sql datatype, or null if none.
    */
-  public static String toJavaType(int sqlType) {
+  public String toJavaType(int sqlType) {
     // mappings from http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
     if (sqlType == Types.INTEGER) {
       return "Integer";
@@ -347,10 +348,19 @@
     } else {
       // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB, BLOB, ARRAY,
       // STRUCT, REF, JAVA_OBJECT.
+      // return database specific java data type
       return null;
     }
   }
 
+  /**
+   * Resolve a database-specific type to Hive data type
+   * @param sqlType     sql type
+   * @return            hive type
+   */
+  public String toHiveType(int sqlType) {
+    return HiveTypes.toHiveType(sqlType);
+  }
 
   public void close() throws SQLException {
   }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/java/org/apache/hadoop/sqoop/orm/ClassWriter.java
Mon Jan 25 03:40:53 2010
@@ -376,7 +376,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("Cannot resolve SQL type " + sqlType);
         continue;
@@ -406,7 +406,7 @@
       fieldNum++;
 
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -443,7 +443,7 @@
       fieldNum++;
 
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -476,7 +476,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -532,7 +532,7 @@
     boolean first = true;
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;
@@ -597,7 +597,7 @@
   private void parseColumn(String colName, int colType, StringBuilder sb) {
     // assume that we have __it and __cur_str vars, based on __loadFromFields() code.
     sb.append("    __cur_str = __it.next();\n");
-    String javaType = SqlManager.toJavaType(colType);
+    String javaType = connManager.toJavaType(colType);
 
     parseNullVal(colName, sb);
     if (javaType.equals("String")) {
@@ -690,7 +690,7 @@
 
     for (String col : colNames) {
       int sqlType = columnTypes.get(col);
-      String javaType = SqlManager.toJavaType(sqlType);
+      String javaType = connManager.toJavaType(sqlType);
       if (null == javaType) {
         LOG.error("No Java type for SQL type " + sqlType);
         continue;

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/TestConnFactory.java
Mon Jan 25 03:40:53 2010
@@ -113,6 +113,24 @@
       return null;
     }
 
+    /**
+    * Default implementation
+    * @param sqlType     sql data type
+    * @return            java data type
+    */
+    public String toJavaType(int sqlType) {
+      return null;
+    }
+
+    /**
+    * Default implementation
+    * @param sqlType     sql data type
+    * @return            hive data type
+    */
+    public String toHiveType(int sqlType) {
+      return null;
+    }
+
     public Map<String, Integer> getColumnTypes(String tableName) {
       return null;
     }

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/src/test/org/apache/hadoop/sqoop/manager/OracleManagerTest.java
Mon Jan 25 03:40:53 2010
@@ -27,7 +27,13 @@
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Date;
+import java.util.Calendar;
+import java.util.TimeZone;
 import java.util.ArrayList;
+import java.text.ParseException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 
 import junit.framework.TestCase;
 
@@ -101,14 +107,16 @@
           + "start_date DATE, "
           + "salary FLOAT, "
           + "dept VARCHAR2(32), "
+          + "timestamp_tz TIMESTAMP WITH TIME ZONE, "
+          + "timestamp_ltz TIMESTAMP WITH LOCAL TIME ZONE, "
           + "PRIMARY KEY (id))");
 
       st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
-          + "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering')");
+          + "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),1000000.00,'engineering','29-DEC-09
12.00.00.000000000 PM','29-DEC-09 12.00.00.000000000 PM')");
       st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
-          + "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales')");
+          + "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),400.00,'sales','30-DEC-09 12.00.00.000000000
PM','30-DEC-09 12.00.00.000000000 PM')");
       st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
-          + "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing')");
+          + "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,'marketing','31-DEC-09 12.00.00.000000000
PM','31-DEC-09 12.00.00.000000000 PM')");
       connection.commit();
     } catch (SQLException sqlE) {
       LOG.error("Encountered SQL Exception: " + sqlE);
@@ -180,7 +188,7 @@
       ioe.printStackTrace();
       fail(ioe.toString());
     }
-
+      
     File f = new File(filePath.toString());
     assertTrue("Could not find imported data file", f.exists());
     BufferedReader r = null;
@@ -188,7 +196,7 @@
       // Read through the file and make sure it's all there.
       r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
       for (String expectedLine : expectedResults) {
-        assertEquals(expectedLine, r.readLine());
+        compareRecords(expectedLine, r.readLine());
       }
     } catch (IOException ioe) {
       LOG.error("Got IOException verifying results: " + ioe.toString());
@@ -208,11 +216,80 @@
     // a strict DATE type. Thus we include HH:MM:SS.mmmmm below.
     // See http://www.oracle.com/technology/tech/java/sqlj_jdbc/htdocs/jdbc_faq.html#08_01
     String [] expectedResults = {
-        "1,Aaron,2009-05-14 00:00:00.0,1000000,engineering",
-        "2,Bob,2009-04-20 00:00:00.0,400,sales",
-        "3,Fred,2009-01-23 00:00:00.0,15,marketing"
+        "1,Aaron,2009-05-14 00:00:00.0,1000000,engineering,2009-12-29 12:00:00.0,2009-12-29
12:00:00.0",
+        "2,Bob,2009-04-20 00:00:00.0,400,sales,2009-12-30 12:00:00.0,2009-12-30 12:00:00.0",
+        "3,Fred,2009-01-23 00:00:00.0,15,marketing,2009-12-31 12:00:00.0,2009-12-31 12:00:00.0"
     };
 
     runOracleTest(expectedResults);
   }
+
+  /**
+   * Compare two lines
+   * @param expectedLine    expected line
+   * @param receivedLine    received line
+   * @throws IOException    exception during lines comparison
+   */
+  private void compareRecords(String expectedLine, String receivedLine) throws IOException
{
+    // handle null case
+    if (expectedLine == null || receivedLine == null) {
+      return;
+    }
+
+    // check if lines are equal
+    if (expectedLine.equals(receivedLine)) {
+      return;
+    }
+
+    // check if size is the same
+    String [] expectedValues = expectedLine.split(",");
+    String [] receivedValues = receivedLine.split(",");
+    if (expectedValues.length != 7 || receivedValues.length != 7) {
+      LOG.error("Number of expected fields did not match number of received fields");
+      throw new IOException("Number of expected fields did not match number of received fields");
+    }
+
+    // check first 5 values
+    boolean mismatch = false;
+    for (int i = 0; !mismatch && i < 5; i++) {
+      mismatch = !expectedValues[i].equals(receivedValues[i]);
+    }
+    if (mismatch) {
+      throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine
+ ">");
+    }
+
+    Date expectedDate = null;
+    Date receivedDate = null;
+    DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S");
+    int offset = TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 3600000;
+    for (int i = 5; i < 7; i++) {
+      // parse expected timestamp
+      try {
+        expectedDate = df.parse(expectedValues[i]);
+      } catch (ParseException ex) {
+        LOG.error("Could not parse expected timestamp: " + expectedValues[i]);
+        throw new IOException("Could not parse expected timestamp: " + expectedValues[i]);
+      }
+
+      // parse received timestamp
+      try {
+        receivedDate = df.parse(receivedValues[i]);
+      } catch (ParseException ex) {
+        LOG.error("Could not parse received timestamp: " + receivedValues[i]);
+        throw new IOException("Could not parse received timestamp: " + receivedValues[i]);
+      }
+
+      // compare two timestamps considering timezone offset
+      Calendar expectedCal = Calendar.getInstance();
+      expectedCal.setTime(expectedDate);
+      expectedCal.add(Calendar.HOUR, offset);
+
+      Calendar receivedCal = Calendar.getInstance();
+      receivedCal.setTime(receivedDate);
+
+      if (!expectedCal.equals(receivedCal)) {
+        throw new IOException("Expected:<" + expectedLine + "> but was:<" + receivedLine
+ ">, while timezone offset is: " + offset);
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=902696&r1=902695&r2=902696&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
Mon Jan 25 03:40:53 2010
@@ -23,18 +23,25 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.TimeZone;
+import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A RecordReader that reads records from an Oracle SQL table.
  */
 public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T>
{
 
+  private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+
   public OracleDBRecordReader(DBInputFormat.DBInputSplit split, 
       Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
       String cond, String [] fields, String table) throws SQLException {
     super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
+    setSessionTimeZone(conn);
   }
 
   /** Returns the query for selecting the records from an Oracle DB. */
@@ -86,4 +93,43 @@
 
     return query.toString();
   }
+
+  /**
+   * Set session time zone
+   * @param conn      Connection object
+   * @throws          SQLException instance
+   */
+  private void setSessionTimeZone(Connection conn) throws SQLException {
+    // need to use reflection to call the method setSessionTimeZone on the OracleConnection
class
+    // because oracle specific java libraries are not accessible in this context
+    Method method;
+    try {
+      method = conn.getClass().getMethod(
+              "setSessionTimeZone", new Class [] {String.class});
+    } catch (Exception ex) {
+      LOG.error("Could not find method setSessionTimeZone in " + conn.getClass().getName(),
ex);
+      // rethrow SQLException
+      throw new SQLException(ex);
+    }
+
+    // Need to set the time zone in order for Java
+    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE"
+    String clientTimeZone = TimeZone.getDefault().getID();
+    try {
+      method.setAccessible(true);
+      method.invoke(conn, clientTimeZone);
+      LOG.info("Time zone has been set");
+    } catch (Exception ex) {
+      LOG.warn("Time zone " + clientTimeZone +
+               " could not be set on oracle database.");
+      LOG.info("Setting default time zone: UTC");
+      try {
+        method.invoke(conn, "UTC");
+      } catch (Exception ex2) {
+        LOG.error("Could not set time zone for oracle connection", ex2);
+        // rethrow SQLException
+        throw new SQLException(ex);
+      }
+    }
+  }
 }



Mime
View raw message