hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r906228 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/lib/db/
Date Wed, 03 Feb 2010 20:30:42 GMT
Author: tomwhite
Date: Wed Feb  3 20:30:41 2010
New Revision: 906228

URL: http://svn.apache.org/viewvc?rev=906228&view=rev
Log:
MAPREDUCE-1443. DBInputFormat can leak connections. Contributed by Aaron Kimball.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=906228&r1=906227&r2=906228&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Feb  3 20:30:41 2010
@@ -292,6 +292,9 @@
     has right permissions and ownership before performing any actions.
     (Amareshwari Sriramadasu via yhemanth)
 
+    MAPREDUCE-1443. DBInputFormat can leak connections.
+    (Aaron Kimball via tomwhite)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java?rev=906228&r1=906227&r2=906228&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
Wed Feb  3 20:30:41 2010
@@ -149,9 +149,7 @@
     dbConf = new DBConfiguration(conf);
 
     try {
-      this.connection = dbConf.getConnection();
-      this.connection.setAutoCommit(false);
-      connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
+      getConnection();
 
       DatabaseMetaData dbMeta = connection.getMetaData();
       this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
@@ -174,6 +172,17 @@
   }
 
   public Connection getConnection() {
+    try {
+      if (null == this.connection) {
+        // The connection was closed; reinstantiate it.
+        this.connection = dbConf.getConnection();
+        this.connection.setAutoCommit(false);
+        this.connection.setTransactionIsolation(
+            Connection.TRANSACTION_SERIALIZABLE);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
     return connection;
   }
 
@@ -191,15 +200,18 @@
       if (dbProductName.startsWith("ORACLE")) {
         // use Oracle-specific db reader.
         return new OracleDBRecordReader<T>(split, inputClass,
-            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+            conf, getConnection(), getDBConf(), conditions, fieldNames,
+            tableName);
       } else if (dbProductName.startsWith("MYSQL")) {
         // use MySQL-specific db reader.
         return new MySQLDBRecordReader<T>(split, inputClass,
-            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+            conf, getConnection(), getDBConf(), conditions, fieldNames,
+            tableName);
       } else {
         // Generic reader.
         return new DBRecordReader<T>(split, inputClass,
-            conf, connection, getDBConf(), conditions, fieldNames, tableName);
+            conf, getConnection(), getDBConf(), conditions, fieldNames,
+            tableName);
       }
     } catch (SQLException ex) {
       throw new IOException(ex.getMessage());
@@ -251,13 +263,16 @@
       connection.commit();
       return splits;
     } catch (SQLException e) {
+      throw new IOException("Got SQLException", e);
+    } finally {
       try {
         if (results != null) { results.close(); }
       } catch (SQLException e1) {}
       try {
         if (statement != null) { statement.close(); }
       } catch (SQLException e1) {}
-      throw new IOException(e.getMessage());
+
+      closeConnection();
     }
   }
 
@@ -325,4 +340,13 @@
     dbConf.setInputQuery(inputQuery);
     dbConf.setInputCountQuery(inputCountQuery);
   }
+
+  protected void closeConnection() {
+    try {
+      if (null != this.connection) {
+        this.connection.close();
+        this.connection = null;
+      }
+    } catch (SQLException sqlE) { } // ignore exception on close.
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java?rev=906228&r1=906227&r2=906228&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DBRecordReader.java
Wed Feb  3 20:30:41 2010
@@ -156,6 +156,7 @@
       }
       if (null != connection) {
         connection.commit();
+        connection.close();
       }
     } catch (SQLException e) {
       throw new IOException(e.getMessage());

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java?rev=906228&r1=906227&r2=906228&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java
Wed Feb  3 20:30:41 2010
@@ -162,8 +162,9 @@
 
     ResultSet results = null;
     Statement statement = null;
+    Connection connection = getConnection();
     try {
-      statement = getConnection().createStatement();
+      statement = connection.createStatement();
 
       results = statement.executeQuery(getBoundingValsQuery());
       results.next();
@@ -199,7 +200,8 @@
       }
 
       try {
-        getConnection().commit();
+        connection.commit();
+        closeConnection();
       } catch (SQLException se) {
         LOG.debug("SQLException committing split transaction: " + se.toString());
       }



Mime
View raw message