hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r585234 - in /lucene/hadoop/branches/branch-0.15: CHANGES.txt src/c++/pipes/impl/HadoopPipes.cc src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Date Tue, 16 Oct 2007 19:50:02 GMT
Author: omalley
Date: Tue Oct 16 12:50:00 2007
New Revision: 585234

URL: http://svn.apache.org/viewvc?rev=585234&view=rev
Log:
Merge -r 585219:585220 from trunk to branch 15 to fix HADOOP-1788.

Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/c++/pipes/impl/HadoopPipes.cc
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=585234&r1=585233&r2=585234&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Tue Oct 16 12:50:00 2007
@@ -110,6 +110,9 @@
     HADOOP-1774. Remove use of INode.parent in Block CRC upgrade.
     (Raghu Angadi via dhruba)
 
+    HADOOP-1788.  Increase the buffer size on the Pipes command socket.
+    (Amareshwari Sri Ramadasu and Christian Kunz via omalley)
+
   BUG FIXES
 
     HADOOP-1946.  The Datanode code does not need to invoke du on

Modified: lucene/hadoop/branches/branch-0.15/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=585234&r1=585233&r2=585234&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ lucene/hadoop/branches/branch-0.15/src/c++/pipes/impl/HadoopPipes.cc Tue Oct 16 12:50:00
2007
@@ -855,6 +855,8 @@
       int sock = -1;
       FILE* stream = NULL;
       FILE* outStream = NULL;
+      char *bufin = NULL;
+      char *bufout = NULL;
       if (portStr) {
         sock = socket(PF_INET, SOCK_STREAM, 0);
         HADOOP_ASSERT(sock != - 1,
@@ -866,8 +868,22 @@
         HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                       string("problem connecting command socket: ") +
                       strerror(errno));
+
         stream = fdopen(sock, "r");
         outStream = fdopen(sock, "w");
+
+        // increase buffer size
+        int bufsize = 128*1024;
+        int setbuf;
+        bufin = new char[bufsize];
+        bufout = new char[bufsize];
+        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+                                     + strerror(errno));
+        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+                                     + strerror(errno));
+
         connection = new BinaryProtocol(stream, context, outStream);
       } else if (getenv("hadoop.pipes.command.file")) {
         char* filename = getenv("hadoop.pipes.command.file");
@@ -907,6 +923,8 @@
       if (outStream != NULL) {
         //fclose(outStream);
       } 
+      delete bufin;
+      delete bufout;
       return true;
     } catch (Error& err) {
       fprintf(stderr, "Hadoop Pipes Exception: %s\n", 

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=585234&r1=585233&r2=585234&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
(original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Tue Oct 16 12:50:00 2007
@@ -44,6 +44,11 @@
   implements DownwardProtocol<K1, V1> {
   
   public static final int CURRENT_PROTOCOL_VERSION = 0;
+  /**
+   * The buffer size for the command socket
+   */
+  private static final int BUFFER_SIZE = 128*1024;
+
   private DataOutputStream stream;
   private DataOutputBuffer buffer = new DataOutputBuffer();
   private static final Log LOG = 
@@ -87,7 +92,8 @@
     public UplinkReaderThread(InputStream stream,
                               UpwardProtocol<K2, V2> handler, 
                               K2 key, V2 value) throws IOException{
-      inStream = new DataInputStream(stream);
+      inStream = new DataInputStream(new BufferedInputStream(stream, 
+                                                             BUFFER_SIZE));
       this.handler = handler;
       this.key = key;
       this.value = value;
@@ -207,7 +213,8 @@
     if (Submitter.getKeepCommandFile(config)) {
       raw = new TeeOutputStream("downlink.data", raw);
     }
-    stream = new DataOutputStream(raw);
+    stream = new DataOutputStream(new BufferedOutputStream(raw, 
+                                                           BUFFER_SIZE)) ;
     uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
                                             handler, key, value);
     uplink.setName("pipe-uplink-handler");
@@ -287,6 +294,7 @@
   public void close() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
+    stream.flush();
   }
   
   public void abort() throws IOException {



Mime
View raw message