hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r586006 - in /lucene/hadoop/branches/branch-0.15: ./ src/java/org/apache/hadoop/mapred/pipes/
Date Thu, 18 Oct 2007 15:31:09 GMT
Author: acmurthy
Date: Thu Oct 18 08:31:09 2007
New Revision: 586006

URL: http://svn.apache.org/viewvc?rev=586006&view=rev
Log:
Merge -r 586002:586003 from trunk to branch-0.15 to fix HADOOP-2070.

Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Thu Oct 18 08:31:09 2007
@@ -315,6 +315,10 @@
     very uneven splits for applications like distcp that count on them.
     (omalley)
 
+    HADOOP-2070.  Added a flush method to pipes' DownwardProtocol and call
+    that before waiting for the application to finish to ensure all buffered
+    data is flushed. (Owen O'Malley via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java
Thu Oct 18 08:31:09 2007
@@ -51,7 +51,7 @@
   private Process process;
   private Socket clientSocket;
   private OutputHandler<K2, V2> handler;
-  private BinaryProtocol<K1, V1, K2, V2> downlink;
+  private DownwardProtocol<K1, V1> downlink;
 
   /**
    * Start the child process to handle the task for us.
@@ -109,6 +109,7 @@
    * @throws Throwable
    */
   boolean waitForFinish() throws Throwable {
+    downlink.flush();
     return handler.waitForFinish();
   }
 
@@ -121,6 +122,7 @@
     LOG.info("Aborting because of " + StringUtils.stringifyException(t));
     try {
       downlink.abort();
+      downlink.flush();
     } catch (IOException e) {
       // IGNORE cleanup problems
     }
@@ -141,7 +143,7 @@
   void cleanup() throws IOException {
     serverSocket.close();
     try {
-      downlink.closeConnection();
+      downlink.close();
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
     }      

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Thu Oct 18 08:31:09 2007
@@ -226,7 +226,7 @@
    * @throws IOException
    * @throws InterruptedException
    */
-  public void closeConnection() throws IOException, InterruptedException {
+  public void close() throws IOException, InterruptedException {
     LOG.debug("closing connection");
     stream.close();
     uplink.closeConnection();
@@ -291,15 +291,18 @@
     writeObject(value);
   }
 
-  public void close() throws IOException {
+  public void endOfInput() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
-    stream.flush();
   }
   
   public void abort() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.ABORT.code);
     LOG.debug("Sent abort command");
+  }
+
+  public void flush() throws IOException {
+    stream.flush();
   }
 
   /**

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java
Thu Oct 18 08:31:09 2007
@@ -97,11 +97,21 @@
    * input.
    * @throws IOException
    */
-  void close() throws IOException;
+  void endOfInput() throws IOException;
   
   /**
    * The task should stop as soon as possible, because something has gone wrong.
    * @throws IOException
    */
   void abort() throws IOException;
+  
+  /**
+   * Flush the data through any buffers.
+   */
+  void flush() throws IOException;
+  
+  /**
+   * Close the connection.
+   */
+  void close() throws IOException, InterruptedException;
 }

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
Thu Oct 18 08:31:09 2007
@@ -76,7 +76,7 @@
           // map pair to output
           downlink.mapItem(key, value);
         }
-        downlink.close();
+        downlink.endOfInput();
       }
       application.waitForFinish();
     } catch (Throwable t) {

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=586006&r1=586005&r2=586006&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java
Thu Oct 18 08:31:09 2007
@@ -94,8 +94,9 @@
     }
     try {
       if (isOk) {
-        application.getDownlink().close();
+        application.getDownlink().endOfInput();
       } else {
+        // send the abort to the application and let it clean up
         application.getDownlink().abort();
       }
       LOG.info("waiting for finish");



Mime
View raw message