hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alexander Schwid (JIRA)" <j...@apache.org>
Subject [jira] Created: (HADOOP-4331) DBOutputFormat: add batch size support for JDBC and recieve DBWritable object in value not in key
Date Thu, 02 Oct 2008 09:11:44 GMT
DBOutputFormat: add batch size support for JDBC and recieve  DBWritable object in value not
in key
--------------------------------------------------------------------------------------------------

                 Key: HADOOP-4331
                 URL: https://issues.apache.org/jira/browse/HADOOP-4331
             Project: Hadoop Core
          Issue Type: Improvement
          Components: mapred
            Reporter: Alexander Schwid
            Priority: Minor
             Fix For: 0.19.0
         Attachments: patch.txt

add batch size support for JDBC in DBOutputFormat 
recieve  DBWritable object in value not in key in DBOutputFormat

---------------patch--------------


Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java        (revision 701034)
+++ src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java        (working copy)
@@ -80,6 +80,11 @@
   /** Field names in the Output table */
   public static final String OUTPUT_FIELD_NAMES_PROPERTY = "mapred.jdbc.output.field.names";

+  /** Batch size for output statement */
+  public static final String OUTPUT_BATCH_SIZE = "mapred.jdbc.output.batch.size";
+
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+
   /**
    * Sets the DB access related fields in the JobConf.
    * @param job the job
@@ -212,5 +217,12 @@
     job.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
   }

+  int getBatchSize() {
+    return job.getInt(DBConfiguration.OUTPUT_BATCH_SIZE, DEFAULT_BATCH_SIZE);
+  }
+
+  void setBatchSize(int sz) {
+    job.setInt(DBConfiguration.OUTPUT_BATCH_SIZE, sz);
+  }
 }

Index: src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
===================================================================
--- src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java        (revision 701034)
+++ src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java        (working copy)
@@ -37,11 +37,11 @@
  * A OutputFormat that sends the reduce output to a SQL table.
  * <p>
  * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where
- * key has a type extending DBWritable. Returned {@link RecordWriter}
- * writes <b>only the key</b> to the database with a batch SQL query.
+ * value has a type extending DBWritable. Returned {@link RecordWriter}
+ * writes <b>only the value</b> to the database with a batch SQL query.
  *
  */
-public class DBOutputFormat<K  extends DBWritable, V>
+public class DBOutputFormat<K, V extends DBWritable>
 implements OutputFormat<K,V> {

   private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
@@ -54,27 +54,21 @@

     private Connection connection;
     private PreparedStatement statement;
+    private int batch = 0;
+    private int batchSize;

     protected DBRecordWriter(Connection connection
-        , PreparedStatement statement) throws SQLException {
+        , PreparedStatement statement, int batchSize) throws SQLException {
       this.connection = connection;
       this.statement = statement;
       this.connection.setAutoCommit(false);
+      this.batchSize = batchSize;
     }

     /** {@inheritDoc} */
     public void close(Reporter reporter) throws IOException {
       try {
-        statement.executeBatch();
-        connection.commit();
-      } catch (SQLException e) {
-        try {
-          connection.rollback();
-        }
-        catch (SQLException ex) {
-          LOG.warn(StringUtils.stringifyException(ex));
-        }
-        throw new IOException(e.getMessage());
+        executeBatch();
       } finally {
         try {
           statement.close();
@@ -89,12 +83,37 @@
     /** {@inheritDoc} */
     public void write(K key, V value) throws IOException {
       try {
-        key.write(statement);
+        value.write(statement);
         statement.addBatch();
+        batch++;
+        if (batch == batchSize) {
+          executeBatch();
+          batch = 0;
+        }
+
       } catch (SQLException e) {
         e.printStackTrace();
       }
     }
+
+    private void executeBatch() throws IOException {
+      if (batch > 0) {
+        try {
+          statement.executeBatch();
+          connection.commit();
+          statement.clearBatch();
+        }
+        catch(SQLException e) {
+          try {
+            connection.rollback();
+          }
+          catch (SQLException ex) {
+            LOG.warn(StringUtils.stringifyException(ex));
+          }
+          throw new IOException(e.getMessage());
+        }
+      }
+    }
   }

   /**
@@ -129,13 +148,14 @@
     DBConfiguration dbConf = new DBConfiguration(job);
     String tableName = dbConf.getOutputTableName();
     String[] fieldNames = dbConf.getOutputFieldNames();
+    int batchSize = dbConf.getBatchSize();

     try {
       Connection connection = dbConf.getConnection();
       PreparedStatement statement = null;

       statement = connection.prepareStatement(constructQuery(tableName, fieldNames));
-      return new DBRecordWriter(connection, statement);
+      return new DBRecordWriter(connection, statement, batchSize);
     }
     catch (Exception ex) {
       throw new IOException(ex.getMessage());


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message