hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1526259 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
Date Wed, 25 Sep 2013 18:28:31 GMT
Author: millecker
Date: Wed Sep 25 18:28:31 2013
New Revision: 1526259

URL: http://svn.apache.org/r1526259
Log:
HAMA-805: Problem initializing pipes in HamaStreaming

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Sep 25 18:28:31 2013
@@ -10,7 +10,8 @@ Release 0.6.3 (unreleased changes)
    HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via tommaso)
 
   BUG FIXES
-  
+
+   HAMA-805: Problem initializing pipes in HamaStreaming (Martin Illecker)  
    HAMA-789: BspPeer launched fail because port is bound by others (Suraj Menon via edwardyoon)
    HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good hypothesis sometimes.
(Yexi Jiang)
    HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead the user.
(Yexi Jiang)

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Wed Sep
25 18:28:31 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.pipes.Submitter;
+import org.apache.hama.pipes.protocol.UplinkReader;
 
 /**
  * This protocol is a binary implementation of the Hama Pipes protocol.
@@ -77,7 +78,8 @@ public class BinaryProtocol<K1 extends W
    * messages are public methods on this object.
    * 
    * @param jobConfig The job's configuration
-   * @param sock The socket to communicate on.
+   * @param out The output stream to communicate on.
+   * @param in The input stream to communicate on.
    * @throws IOException
    */
   public BinaryProtocol(Configuration conf, OutputStream out, InputStream in)
@@ -116,12 +118,17 @@ public class BinaryProtocol<K1 extends W
       out = new TeeOutputStream("downlink.data", out);
     }
     stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
-    uplink = new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+    uplink = getUplinkReader(peer, in);
 
     uplink.setName("pipe-uplink-handler");
     uplink.start();
   }
 
+  public UplinkReader<K1, V1, K2, V2> getUplinkReader(
+      BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream in) throws IOException
{
+    return new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+  }
+
   public boolean isHasTask() {
     return hasTask;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1526259&r1=1526258&r2=1526259&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Wed
Sep 25 18:28:31 2013
@@ -35,6 +35,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.pipes.protocol.UplinkReader;
+import org.apache.hama.pipes.protocol.StreamingProtocol.StreamingUplinkReaderThread;
 import org.apache.hama.util.KeyValuePair;
 
 /**
@@ -61,6 +63,13 @@ public class StreamingProtocol<K1 extend
     super(peer, out, in);
   }
 
+  @Override
+  public UplinkReader<K1, V1, Text, Text> getUplinkReader(
+      BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream in)
+      throws IOException {
+    return new StreamingUplinkReaderThread(peer, in);
+  }
+  
   public class StreamingUplinkReaderThread extends
       UplinkReader<K1, V1, Text, Text> {
 
@@ -265,12 +274,6 @@ public class StreamingProtocol<K1 extend
     waitOnAck();
   }
 
-  /*
-   * @Override public UplinkReaderThread getUplinkReader( BSPPeer<K1, V1, Text,
-   * Text, BytesWritable> peer, InputStream in) throws IOException { return new
-   * StreamingUplinkReaderThread(peer, in); }
-   */
-
   public void writeLine(int msg) throws IOException {
     writeLine("" + msg);
   }



Mime
View raw message