hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1544761 [3/3] - in /hama/trunk: ./ c++/ c++/src/ c++/src/main/native/examples/ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/ c++/src/main/native/pipes/api/hama/ c++/src/main/nativ...
Date Sat, 23 Nov 2013 08:41:10 GMT
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesPartitioner.java Sat Nov 23 08:41:09 2013
@@ -21,7 +21,8 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hama.bsp.Partitioner;
 
 /**
@@ -30,12 +31,29 @@ import org.apache.hama.bsp.Partitioner;
  * BinaryProtocol -> C++ Partitioner and back
  * 
  */
-public class PipesPartitioner<K, V> implements Partitioner<K, V>,
-    PipesApplicable {
+public class PipesPartitioner<K, V> implements Partitioner<K, V> {
 
-  private static final Log LOG = LogFactory.getLog(PipesPartitioner.class
-      .getName());
-  private PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> application = null;
+  private static final Log LOG = LogFactory.getLog(PipesPartitioner.class);
+  private PipesApplication<K, V, ?, ?, BytesWritable> application = new PipesApplication<K, V, Object, Object, BytesWritable>();
+
+  public PipesPartitioner(Configuration conf) {
+    LOG.debug("Start Pipes client for PipesPartitioner.");
+    try {
+      application.start(conf);
+    } catch (IOException e) {
+      LOG.error(e);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
+  }
+
+  public void cleanup() {
+    try {
+      application.cleanup();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
 
   /**
    * Partitions a specific key value mapping to a bucket.
@@ -50,26 +68,20 @@ public class PipesPartitioner<K, V> impl
   public int getPartition(K key, V value, int numTasks) {
     int returnVal = 0;
     try {
-      // LOG.info("pipesApp==null: " + ((pipesApp == null) ? "true" : "false"));
-      // LOG.info("pipesApp.getDownlink()==null: "
-      // + ((pipesApp.getDownlink() == null) ? "true" : "false"));
-
-      // LOG.info("Class: "+value.getClass().toString());
-      if ((application != null) && (application.getDownlink() != null))
-        returnVal = application.getDownlink().getPartition(key.toString(),
-            value.toString(), numTasks);
+
+      if ((application != null) && (application.getDownlink() != null)) {
+        returnVal = application.getDownlink()
+            .getPartition(key, value, numTasks);
+      } else {
+        LOG.warn("PipesApplication or application.getDownlink() might be null! (application==null): "
+            + ((application == null) ? "true" : "false"));
+      }
 
     } catch (IOException e) {
       LOG.error(e);
     }
+    LOG.debug("getPartition returns: " + returnVal);
     return returnVal;
   }
-
-  @Override
-  public void setApplication(
-      PipesApplication<? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable, ? extends Writable> pipesApp) {
-
-    this.application = pipesApp;
-  }
-
+  
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Sat Nov 23 08:41:09 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.filecache.Distr
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -230,9 +231,12 @@ public class Submitter implements Tool {
     setIfUnset(job.getConfiguration(), "bsp.input.value.class", textClassname);
     setIfUnset(job.getConfiguration(), "bsp.output.key.class", textClassname);
     setIfUnset(job.getConfiguration(), "bsp.output.value.class", textClassname);
+    setIfUnset(job.getConfiguration(), "bsp.message.class",
+        BytesWritable.class.getName());
 
     setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job");
 
+    // DEBUG Output
     LOG.debug("isJavaRecordReader: "
         + getIsJavaRecordReader(job.getConfiguration()));
     LOG.debug("BspClass: " + job.getBspClass().getName());
@@ -240,13 +244,10 @@ public class Submitter implements Tool {
     LOG.debug("InputFormat: " + job.getInputFormat());
     LOG.debug("InputKeyClass: " + job.getInputKeyClass().getName());
     LOG.debug("InputValueClass: " + job.getInputValueClass().getName());
+    LOG.debug("InputFormat: " + job.getOutputFormat());
     LOG.debug("OutputKeyClass: " + job.getOutputKeyClass().getName());
     LOG.debug("OutputValueClass: " + job.getOutputValueClass().getName());
-
-    if ((!job.getOutputKeyClass().getName().equals(textClassname))
-        || (!job.getOutputValueClass().getName().equals(textClassname)))
-      throw new IllegalArgumentException(
-          "Hama Pipes does only support Text as Key/Value output!");
+    LOG.debug("MessageClass: " + job.get("bsp.message.class"));
 
     LOG.debug("bsp.master.address: "
         + job.getConfiguration().get("bsp.master.address"));
@@ -258,7 +259,8 @@ public class Submitter implements Tool {
 
     String exec = getExecutable(job.getConfiguration());
     if (exec == null) {
-      throw new IllegalArgumentException("No application defined.");
+      throw new IllegalArgumentException(
+          "No application defined. (Set property hama.pipes.executable)");
     }
 
     URI[] fileCache = DistributedCache.getCacheFiles(job.getConfiguration());
@@ -273,7 +275,7 @@ public class Submitter implements Tool {
     try {
       fileCache[0] = new URI(exec);
     } catch (URISyntaxException e) {
-      IOException ie = new IOException("Problem parsing execable URI " + exec);
+      IOException ie = new IOException("Problem parsing executable URI " + exec);
       ie.initCause(e);
       throw ie;
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Sat Nov 23 08:41:09 2013
@@ -33,13 +33,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.pipes.Submitter;
-import org.apache.hama.pipes.protocol.UplinkReader;
 
 /**
  * This protocol is a binary implementation of the Hama Pipes protocol.
@@ -47,8 +47,8 @@ import org.apache.hama.pipes.protocol.Up
  * Adapted from Hadoop Pipes.
  * 
  */
-public class BinaryProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
-    implements DownwardProtocol<K1, V1, K2, V2> {
+public class BinaryProtocol<K1, V1, K2, V2, M extends Writable> implements
+    DownwardProtocol<K1, V1, K2, V2> {
 
   protected static final Log LOG = LogFactory.getLog(BinaryProtocol.class
       .getName());
@@ -57,19 +57,16 @@ public class BinaryProtocol<K1 extends W
    * The buffer size for the command socket
    */
   protected static final int BUFFER_SIZE = 128 * 1024;
-
-  protected final DataOutputStream stream;
-  protected final DataOutputBuffer buffer = new DataOutputBuffer();
-
-  private UplinkReader<K1, V1, K2, V2> uplink;
+  protected final DataOutputStream outStream;
+  /* protected final peer is only needed by the Streaming Protocol */
+  protected final BSPPeer<K1, V1, K2, V2, M> peer;
 
   public final Object hasTaskLock = new Object();
   private boolean hasTask = false;
   public final Object resultLock = new Object();
   private Integer resultInt = null;
 
-  /* Protected final peer is only needed by the Streaming Protocol */
-  protected final BSPPeer<K1, V1, K2, V2, BytesWritable> peer;
+  private UplinkReader<K1, V1, K2, V2, M> uplink;
   private Configuration conf;
 
   /**
@@ -91,11 +88,13 @@ public class BinaryProtocol<K1 extends W
     if (Submitter.getKeepCommandFile(conf)) {
       out = new TeeOutputStream("downlink.data", out);
     }
-    stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
-    uplink = new UplinkReader<K1, V1, K2, V2>(this, conf, in);
 
-    uplink.setName("pipe-uplink-handler");
-    uplink.start();
+    this.outStream = new DataOutputStream(new BufferedOutputStream(out,
+        BUFFER_SIZE));
+
+    this.uplink = new UplinkReader<K1, V1, K2, V2, M>(this, conf, in);
+    this.uplink.setName("pipe-uplink-handler");
+    this.uplink.start();
   }
 
   /**
@@ -108,8 +107,8 @@ public class BinaryProtocol<K1 extends W
    * @param in The input stream to communicate on.
    * @throws IOException
    */
-  public BinaryProtocol(BSPPeer<K1, V1, K2, V2, BytesWritable> peer,
-      OutputStream out, InputStream in) throws IOException {
+  public BinaryProtocol(BSPPeer<K1, V1, K2, V2, M> peer, OutputStream out,
+      InputStream in) throws IOException {
     this.peer = peer;
     this.conf = peer.getConfiguration();
 
@@ -117,20 +116,22 @@ public class BinaryProtocol<K1 extends W
     if (Submitter.getKeepCommandFile(conf)) {
       out = new TeeOutputStream("downlink.data", out);
     }
-    stream = new DataOutputStream(new BufferedOutputStream(out, BUFFER_SIZE));
-    uplink = getUplinkReader(peer, in);
 
-    uplink.setName("pipe-uplink-handler");
-    uplink.start();
+    this.outStream = new DataOutputStream(new BufferedOutputStream(out,
+        BUFFER_SIZE));
+
+    this.uplink = getUplinkReader(peer, in);
+    this.uplink.setName("pipe-uplink-handler");
+    this.uplink.start();
   }
 
-  public UplinkReader<K1, V1, K2, V2> getUplinkReader(
-      BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream in) throws IOException {
-    return new UplinkReader<K1, V1, K2, V2>(this, peer, in);
+  public UplinkReader<K1, V1, K2, V2, M> getUplinkReader(
+      BSPPeer<K1, V1, K2, V2, M> peer, InputStream in) throws IOException {
+    return new UplinkReader<K1, V1, K2, V2, M>(this, peer, in);
   }
 
   public boolean isHasTask() {
-    return hasTask;
+    return this.hasTask;
   }
 
   public synchronized void setHasTask(boolean hasTask) {
@@ -141,8 +142,8 @@ public class BinaryProtocol<K1 extends W
     this.resultInt = result;
   }
 
-  public DataOutputStream getStream() {
-    return stream;
+  public DataOutputStream getOutputStream() {
+    return this.outStream;
   }
 
   /**
@@ -189,8 +190,8 @@ public class BinaryProtocol<K1 extends W
   @Override
   public void start() throws IOException {
     LOG.debug("starting downlink");
-    WritableUtils.writeVInt(stream, MessageType.START.code);
-    WritableUtils.writeVInt(stream, CURRENT_PROTOCOL_VERSION);
+    WritableUtils.writeVInt(this.outStream, MessageType.START.code);
+    WritableUtils.writeVInt(this.outStream, CURRENT_PROTOCOL_VERSION);
     flush();
     LOG.debug("Sent MessageType.START");
     setBSPJobConf(conf);
@@ -198,15 +199,15 @@ public class BinaryProtocol<K1 extends W
 
   @Override
   public void setBSPJobConf(Configuration conf) throws IOException {
-    WritableUtils.writeVInt(stream, MessageType.SET_BSPJOB_CONF.code);
+    WritableUtils.writeVInt(this.outStream, MessageType.SET_BSPJOB_CONF.code);
     List<Entry<String, String>> list = new ArrayList<Entry<String, String>>();
     for (Entry<String, String> entry : conf) {
       list.add(entry);
     }
-    WritableUtils.writeVInt(stream, list.size());
+    WritableUtils.writeVInt(this.outStream, list.size());
     for (Entry<String, String> entry : list) {
-      Text.writeString(stream, entry.getKey());
-      Text.writeString(stream, entry.getValue());
+      Text.writeString(this.outStream, entry.getKey());
+      Text.writeString(this.outStream, entry.getValue());
     }
     flush();
     LOG.debug("Sent MessageType.SET_BSPJOB_CONF including " + list.size()
@@ -216,9 +217,9 @@ public class BinaryProtocol<K1 extends W
   @Override
   public void setInputTypes(String keyType, String valueType)
       throws IOException {
-    WritableUtils.writeVInt(stream, MessageType.SET_INPUT_TYPES.code);
-    Text.writeString(stream, keyType);
-    Text.writeString(stream, valueType);
+    WritableUtils.writeVInt(this.outStream, MessageType.SET_INPUT_TYPES.code);
+    Text.writeString(this.outStream, keyType);
+    Text.writeString(this.outStream, valueType);
     flush();
     LOG.debug("Sent MessageType.SET_INPUT_TYPES");
   }
@@ -227,9 +228,9 @@ public class BinaryProtocol<K1 extends W
   public void runSetup(boolean pipedInput, boolean pipedOutput)
       throws IOException {
 
-    WritableUtils.writeVInt(stream, MessageType.RUN_SETUP.code);
-    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, MessageType.RUN_SETUP.code);
+    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_SETUP");
@@ -239,9 +240,9 @@ public class BinaryProtocol<K1 extends W
   public void runBsp(boolean pipedInput, boolean pipedOutput)
       throws IOException {
 
-    WritableUtils.writeVInt(stream, MessageType.RUN_BSP.code);
-    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, MessageType.RUN_BSP.code);
+    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_BSP");
@@ -251,32 +252,37 @@ public class BinaryProtocol<K1 extends W
   public void runCleanup(boolean pipedInput, boolean pipedOutput)
       throws IOException {
 
-    WritableUtils.writeVInt(stream, MessageType.RUN_CLEANUP.code);
-    WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(stream, pipedOutput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, MessageType.RUN_CLEANUP.code);
+    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_CLEANUP");
   }
 
   @Override
-  public int getPartition(String key, String value, int numTasks)
-      throws IOException {
+  public int getPartition(K1 key, V1 value, int numTasks) throws IOException {
 
-    WritableUtils.writeVInt(stream, MessageType.PARTITION_REQUEST.code);
-    Text.writeString(stream, key);
-    Text.writeString(stream, value);
-    WritableUtils.writeVInt(stream, numTasks);
+    WritableUtils.writeVInt(this.outStream, MessageType.PARTITION_REQUEST.code);
+    writeObject((Writable) key);
+    writeObject((Writable) value);
+    WritableUtils.writeVInt(this.outStream, numTasks);
     flush();
-    LOG.debug("Sent MessageType.PARTITION_REQUEST - key: " + key + " value: "
-        + value.substring(0, 10) + "..." + " numTasks: " + numTasks);
+
+    LOG.debug("Sent MessageType.PARTITION_REQUEST - key: "
+        + ((key.toString().length() < 10) ? key.toString() : key.toString()
+            .substring(0, 9) + "...")
+        + " value: "
+        + ((value.toString().length() < 10) ? value.toString() : value
+            .toString().substring(0, 9) + "...") + " numTasks: " + numTasks);
 
     int resultVal = 0;
 
-    synchronized (resultLock) {
+    synchronized (this.resultLock) {
       try {
-        while (resultInt == null)
-          resultLock.wait();
+        while (resultInt == null) {
+          this.resultLock.wait();
+        }
 
         resultVal = resultInt;
         resultInt = null;
@@ -290,14 +296,14 @@ public class BinaryProtocol<K1 extends W
 
   @Override
   public void abort() throws IOException {
-    WritableUtils.writeVInt(stream, MessageType.ABORT.code);
+    WritableUtils.writeVInt(this.outStream, MessageType.ABORT.code);
     flush();
     LOG.debug("Sent MessageType.ABORT");
   }
 
   @Override
   public void flush() throws IOException {
-    stream.flush();
+    this.outStream.flush();
   }
 
   /**
@@ -310,22 +316,26 @@ public class BinaryProtocol<K1 extends W
   public void close() throws IOException, InterruptedException {
     // runCleanup(pipedInput,pipedOutput);
     LOG.debug("closing connection");
-    endOfInput();
 
-    uplink.interrupt();
-    uplink.join();
+    // Only send closing message back in Hama Pipes NOT in Hama Streaming
+    boolean streamingEnabled = conf.getBoolean("hama.streaming.enabled", false);
+    if (!streamingEnabled) {
+      endOfInput();
+    }
+    this.uplink.interrupt();
+    this.uplink.join();
 
-    uplink.closeConnection();
-    stream.close();
+    this.uplink.closeConnection();
+    this.outStream.close();
   }
 
   @Override
   public boolean waitForFinish() throws IOException, InterruptedException {
     // LOG.debug("waitForFinish... "+hasTask);
-    synchronized (hasTaskLock) {
+    synchronized (this.hasTaskLock) {
       try {
-        while (hasTask)
-          hasTaskLock.wait();
+        while (this.hasTask)
+          this.hasTaskLock.wait();
 
       } catch (InterruptedException e) {
         LOG.error(e);
@@ -336,40 +346,58 @@ public class BinaryProtocol<K1 extends W
   }
 
   public void endOfInput() throws IOException {
-    WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
+    WritableUtils.writeVInt(this.outStream, MessageType.CLOSE.code);
     flush();
     LOG.debug("Sent close command");
     LOG.debug("Sent MessageType.CLOSE");
   }
 
   /**
-   * Write the given object to the stream. If it is a Text or BytesWritable,
-   * write it directly. Otherwise, write it to a buffer and then write the
-   * length and data to the stream.
+   * Write the given object to the stream. If it is a IntWritable, LongWritable,
+   * FloatWritable, DoubleWritable, Text or BytesWritable, write it directly.
+   * Otherwise, write it to a buffer and then write the length and data to the
+   * stream.
    * 
    * @param obj the object to write
    * @throws IOException
    */
   protected void writeObject(Writable obj) throws IOException {
-    // For Text and BytesWritable, encode them directly, so that they end up
+    // For basic types IntWritable, LongWritable, FloatWritable, DoubleWritable,
+    // Text and BytesWritable, encode them directly, so that they end up
     // in C++ as the natural translations.
     if (obj instanceof Text) {
       Text t = (Text) obj;
       int len = t.getLength();
-      WritableUtils.writeVInt(stream, len);
-      stream.write(t.getBytes(), 0, len);
+      WritableUtils.writeVInt(this.outStream, len);
+      this.outStream.write(t.getBytes(), 0, len);
+
     } else if (obj instanceof BytesWritable) {
       BytesWritable b = (BytesWritable) obj;
       int len = b.getLength();
-      WritableUtils.writeVInt(stream, len);
-      stream.write(b.getBytes(), 0, len);
+      WritableUtils.writeVInt(this.outStream, len);
+      this.outStream.write(b.getBytes(), 0, len);
+
+    } else if (obj instanceof IntWritable) {
+      WritableUtils.writeVInt(this.outStream, ((IntWritable) obj).get());
+
+    } else if (obj instanceof LongWritable) {
+      WritableUtils.writeVLong(this.outStream, ((LongWritable) obj).get());
+
+      // else if ((obj instanceof FloatWritable) || (obj instanceof
+      // DoubleWritable))
+
     } else {
-      buffer.reset();
-      obj.write(buffer);
-      int length = buffer.getLength();
-      WritableUtils.writeVInt(stream, length);
-      stream.write(buffer.getData(), 0, length);
+      // Note: other types are transfered as String which should be implemented
+      // in Writable itself
+
+      // DataOutputBuffer buffer = new DataOutputBuffer();
+      // buffer.reset();
+      // obj.write(buffer);
+      // int length = buffer.getLength();
+      // WritableUtils.writeVInt(stream, length);
+      // stream.write(buffer.getData(), 0, length);
+
+      obj.write(this.outStream);
     }
   }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Sat Nov 23 08:41:09 2013
@@ -21,7 +21,6 @@ package org.apache.hama.pipes.protocol;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
 
 /**
  * The abstract description of the downward (from Java to C++) Pipes protocol.
@@ -31,7 +30,7 @@ import org.apache.hadoop.io.Writable;
  * Adapted from Hadoop Pipes.
  * 
  */
-public interface DownwardProtocol<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable> {
+public interface DownwardProtocol<K1, V1, K2, V2> {
 
   /**
    * Start communication
@@ -82,7 +81,7 @@ public interface DownwardProtocol<K1 ext
    * 
    * @throws IOException
    */
-  int getPartition(String key, String value, int numTasks) throws IOException;
+  int getPartition(K1 key, V1 value, int numTasks) throws IOException;
 
   /**
    * The task should stop as soon as possible, because something has gone wrong.

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/MessageType.java Sat Nov 23 08:41:09 2013
@@ -32,7 +32,7 @@ public enum MessageType {
   REOPEN_INPUT(17), CLEAR(18), CLOSE(19), ABORT(20), DONE(21), TASK_DONE(22),
   REGISTER_COUNTER(23), INCREMENT_COUNTER(24), SEQFILE_OPEN(25),
   SEQFILE_READNEXT(26), SEQFILE_APPEND(27), SEQFILE_CLOSE(28),
-  PARTITION_REQUEST(29), PARTITION_RESPONSE(30), LOG(31);
+  PARTITION_REQUEST(29), PARTITION_RESPONSE(30), LOG(31), END_OF_DATA(32);
 
   final int code;
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Sat Nov 23 08:41:09 2013
@@ -49,7 +49,7 @@ import org.apache.hama.commons.util.KeyV
  * @param <V2> output value.
  */
 public class StreamingProtocol<K1 extends Writable, V1 extends Writable>
-    extends BinaryProtocol<K1, V1, Text, Text> {
+    extends BinaryProtocol<K1, V1, Text, Text, BytesWritable> {
 
   private static final Pattern PROTOCOL_STRING_PATTERN = Pattern.compile("=");
 
@@ -62,21 +62,22 @@ public class StreamingProtocol<K1 extend
   }
 
   @Override
-  public UplinkReader<K1, V1, Text, Text> getUplinkReader(
+  public UplinkReader<K1, V1, Text, Text, BytesWritable> getUplinkReader(
       BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream in)
       throws IOException {
-    return new StreamingUplinkReaderThread(peer, in);
+    return new StreamingUplinkReaderThread(this, peer, in);
   }
-  
+
   public class StreamingUplinkReaderThread extends
-      UplinkReader<K1, V1, Text, Text> {
+      UplinkReader<K1, V1, Text, Text, BytesWritable> {
 
     private BufferedReader reader;
 
     public StreamingUplinkReaderThread(
+        BinaryProtocol<K1, V1, Text, Text, BytesWritable> binaryProtocol,
         BSPPeer<K1, V1, Text, Text, BytesWritable> peer, InputStream stream)
         throws IOException {
-      super(null, peer, stream);
+      super(binaryProtocol, peer, stream);
       reader = new BufferedReader(new InputStreamReader(inStream));
     }
 
@@ -164,7 +165,7 @@ public class StreamingProtocol<K1 extend
     }
 
     @Override
-    public int readCommand() throws IOException {
+    protected int readCommand() throws IOException {
       String readLine = reader.readLine();
       if (readLine != null && !readLine.isEmpty()) {
         String[] split = PROTOCOL_STRING_PATTERN.split(readLine, 2);
@@ -211,6 +212,10 @@ public class StreamingProtocol<K1 extend
 
   }
 
+  /* ************************************************************ */
+  /* Override Implementation of DownwardProtocol<K1, V1, K2, V2> */
+  /* ************************************************************ */
+
   @Override
   public void start() throws IOException {
     writeLine(MessageType.START, null);
@@ -277,14 +282,14 @@ public class StreamingProtocol<K1 extend
   }
 
   public void writeLine(String msg) throws IOException {
-    stream.write((msg + "\n").getBytes());
-    stream.flush();
+    outStream.write((msg + "\n").getBytes());
+    outStream.flush();
   }
 
   public void writeLine(MessageType type, String msg) throws IOException {
-    stream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n")
+    outStream.write((getProtocolString(type) + (msg == null ? "" : msg) + "\n")
         .getBytes());
-    stream.flush();
+    outStream.flush();
   }
 
   public String getProtocolString(MessageType type) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Sat Nov 23 08:41:09 2013
@@ -34,6 +34,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -46,24 +48,23 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
 
-public class UplinkReader<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2 extends Writable>
+public class UplinkReader<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable>
     extends Thread {
 
   private static final Log LOG = LogFactory.getLog(UplinkReader.class);
 
-  protected DataInputStream inStream;
-  private K2 key;
-  private V2 value;
-  
-  private BinaryProtocol<K1, V1, K2, V2> binProtocol;
-  private BSPPeer<K1, V1, K2, V2, BytesWritable> peer = null;
+  private BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binProtocol;
+  private BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer = null;
   private Configuration conf;
-  
-  private Map<Integer, Entry<SequenceFile.Reader, Entry<String, String>>> sequenceFileReaders;
-  private Map<Integer, Entry<SequenceFile.Writer, Entry<String, String>>> sequenceFileWriters;
 
-  @SuppressWarnings("unchecked")
-  public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
+  protected DataInputStream inStream;
+  protected DataOutputStream outStream;
+
+  private Map<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>> sequenceFileReaders;
+  private Map<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>> sequenceFileWriters;
+
+  public UplinkReader(
+      BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol,
       Configuration conf, InputStream stream) throws IOException {
 
     this.binProtocol = binaryProtocol;
@@ -72,18 +73,15 @@ public class UplinkReader<K1 extends Wri
     this.inStream = new DataInputStream(new BufferedInputStream(stream,
         BinaryProtocol.BUFFER_SIZE));
 
-    this.key = (K2) ReflectionUtils.newInstance((Class<? extends K2>) conf
-        .getClass("bsp.output.key.class", Object.class), conf);
+    this.outStream = binProtocol.getOutputStream();
 
-    this.value = (V2) ReflectionUtils.newInstance((Class<? extends V2>) conf
-        .getClass("bsp.output.value.class", Object.class), conf);
-
-    this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<String, String>>>();
-    this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<String, String>>>();
+    this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>>();
+    this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>>();
   }
 
-  public UplinkReader(BinaryProtocol<K1, V1, K2, V2> binaryProtocol,
-      BSPPeer<K1, V1, K2, V2, BytesWritable> peer, InputStream stream)
+  public UplinkReader(
+      BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol,
+      BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer, InputStream stream)
       throws IOException {
     this(binaryProtocol, peer.getConfiguration(), stream);
     this.peer = peer;
@@ -93,10 +91,6 @@ public class UplinkReader<K1 extends Wri
     return this.peer != null;
   }
 
-  public void closeConnection() throws IOException {
-    inStream.close();
-  }
-
   @Override
   public void run() {
     while (true) {
@@ -108,7 +102,7 @@ public class UplinkReader<K1 extends Wri
         int cmd = readCommand();
         if (cmd == -1)
           continue;
-        LOG.debug("Handling uplink command " + cmd);
+        LOG.debug("Handling uplink command: " + MessageType.values()[cmd]);
 
         if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
           writeKeyValue();
@@ -182,12 +176,18 @@ public class UplinkReader<K1 extends Wri
     }
   }
 
+  // onError is overwritten by StreamingProtocol in Hama Streaming
   protected void onError(Throwable e) {
     LOG.error(StringUtils.stringifyException(e));
   }
 
-  public int readCommand() throws IOException {
-    return WritableUtils.readVInt(inStream);
+  // readCommand is overwritten by StreamingProtocol in Hama Streaming
+  protected int readCommand() throws IOException {
+    return WritableUtils.readVInt(this.inStream);
+  }
+
+  public void closeConnection() throws IOException {
+    this.inStream.close();
   }
 
   public void reopenInput() throws IOException {
@@ -196,51 +196,47 @@ public class UplinkReader<K1 extends Wri
   }
 
   public void getSuperstepCount() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.GET_SUPERSTEP_COUNT.code);
-    WritableUtils.writeVLong(stream, peer.getSuperstepCount());
+    WritableUtils.writeVInt(this.outStream,
+        MessageType.GET_SUPERSTEP_COUNT.code);
+    WritableUtils.writeVLong(this.outStream, peer.getSuperstepCount());
     binProtocol.flush();
-
     LOG.debug("Responded MessageType.GET_SUPERSTEP_COUNT - SuperstepCount: "
         + peer.getSuperstepCount());
   }
 
   public void getPeerCount() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.GET_PEER_COUNT.code);
-    WritableUtils.writeVInt(stream, peer.getNumPeers());
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_COUNT.code);
+    WritableUtils.writeVInt(this.outStream, peer.getNumPeers());
     binProtocol.flush();
     LOG.debug("Responded MessageType.GET_PEER_COUNT - NumPeers: "
         + peer.getNumPeers());
   }
 
   public void getPeerIndex() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.GET_PEER_INDEX.code);
-    WritableUtils.writeVInt(stream, peer.getPeerIndex());
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_PEER_INDEX.code);
+    WritableUtils.writeVInt(this.outStream, peer.getPeerIndex());
     binProtocol.flush();
     LOG.debug("Responded MessageType.GET_PEER_INDEX - PeerIndex: "
         + peer.getPeerIndex());
   }
 
   public void getPeerName() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
-    int id = WritableUtils.readVInt(inStream);
+    int id = WritableUtils.readVInt(this.inStream);
     LOG.debug("Got MessageType.GET_PEERNAME id: " + id);
 
-    WritableUtils.writeVInt(stream, MessageType.GET_PEERNAME.code);
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_PEERNAME.code);
     if (id == -1) { // -1 indicates get own PeerName
-      Text.writeString(stream, peer.getPeerName());
+      Text.writeString(this.outStream, peer.getPeerName());
       LOG.debug("Responded MessageType.GET_PEERNAME - Get Own PeerName: "
           + peer.getPeerName());
 
     } else if ((id < -1) || (id >= peer.getNumPeers())) {
       // if no PeerName for this index is found write emptyString
-      Text.writeString(stream, "");
+      Text.writeString(this.outStream, "");
       LOG.debug("Responded MessageType.GET_PEERNAME - Empty PeerName!");
 
     } else {
-      Text.writeString(stream, peer.getPeerName(id));
+      Text.writeString(this.outStream, peer.getPeerName(id));
       LOG.debug("Responded MessageType.GET_PEERNAME - PeerName: "
           + peer.getPeerName(id));
     }
@@ -248,13 +244,12 @@ public class UplinkReader<K1 extends Wri
   }
 
   public void getAllPeerNames() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
     LOG.debug("Got MessageType.GET_ALL_PEERNAME");
-    WritableUtils.writeVInt(stream, MessageType.GET_ALL_PEERNAME.code);
-    WritableUtils.writeVInt(stream, peer.getAllPeerNames().length);
-    for (String s : peer.getAllPeerNames())
-      Text.writeString(stream, s);
-
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code);
+    WritableUtils.writeVInt(this.outStream, peer.getAllPeerNames().length);
+    for (String s : peer.getAllPeerNames()) {
+      Text.writeString(this.outStream, s);
+    }
     binProtocol.flush();
     LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
         + peer.getAllPeerNames().length);
@@ -266,44 +261,59 @@ public class UplinkReader<K1 extends Wri
   }
 
   public void getMessage() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
     LOG.debug("Got MessageType.GET_MSG");
-    WritableUtils.writeVInt(stream, MessageType.GET_MSG.code);
-    BytesWritable msg = peer.getCurrentMessage();
-    if (msg != null)
-      binProtocol.writeObject(msg);
-
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code);
+    Writable message = peer.getCurrentMessage();
+    if (message != null) {
+      binProtocol.writeObject(message);
+    }
     binProtocol.flush();
-    LOG.debug("Responded MessageType.GET_MSG - Message(BytesWritable) ");// +msg);
+    LOG.debug("Responded MessageType.GET_MSG - Message: "
+        + ((message.toString().length() < 10) ? message.toString() : message
+            .toString().substring(0, 9) + "..."));
   }
 
   public void getMessageCount() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.GET_MSG_COUNT.code);
-    WritableUtils.writeVInt(stream, peer.getNumCurrentMessages());
+    WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG_COUNT.code);
+    WritableUtils.writeVInt(this.outStream, peer.getNumCurrentMessages());
     binProtocol.flush();
     LOG.debug("Responded MessageType.GET_MSG_COUNT - Count: "
         + peer.getNumCurrentMessages());
   }
 
-  public void sendMessage() throws IOException {
-    String peerName = Text.readString(inStream);
-    BytesWritable msg = new BytesWritable();
-    readObject(msg);
-    LOG.debug("Got MessageType.SEND_MSG to peerName: " + peerName);
-    peer.send(peerName, msg);
+  public void incrementCounter() throws IOException {
+    String group = Text.readString(this.inStream);
+    String name = Text.readString(this.inStream);
+    long amount = WritableUtils.readVLong(this.inStream);
+    peer.incrementCounter(group, name, amount);
   }
 
-  public void incrementCounter() throws IOException {
-    // int id = WritableUtils.readVInt(inStream);
-    String group = Text.readString(inStream);
-    String name = Text.readString(inStream);
-    long amount = WritableUtils.readVLong(inStream);
-    peer.incrementCounter(name, group, amount);
+  @SuppressWarnings("unchecked")
+  public void sendMessage() throws IOException, InstantiationException,
+      IllegalAccessException {
+    String peerName = Text.readString(this.inStream);
+
+    M message = (M) ReflectionUtils.newInstance((Class<? extends M>) conf
+        .getClass("bsp.message.class", BytesWritable.class), conf);
+
+    LOG.debug("Got MessageType.SEND_MSG peerName: " + peerName
+        + " messageClass: " + message.getClass().getName());
+
+    readObject(message);
+
+    peer.send(peerName, message);
+
+    LOG.debug("Done MessageType.SEND_MSG to peerName: "
+        + peerName
+        + " messageClass: "
+        + message.getClass().getName()
+        + " Message: "
+        + ((message.toString().length() < 10) ? message.toString() : message
+            .toString().substring(0, 9) + "..."));
   }
 
   public void readKeyValue() throws IOException {
-    DataOutputStream stream = binProtocol.getStream();
+
     boolean nullinput = peer.getConfiguration().get(
         Constants.INPUT_FORMAT_CLASS) == null
         || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
@@ -311,54 +321,73 @@ public class UplinkReader<K1 extends Wri
 
     if (!nullinput) {
 
-      KeyValuePair<K1, V1> pair = peer.readNext();
+      KeyValuePair<KEYIN, VALUEIN> pair = peer.readNext();
 
-      WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
       if (pair != null) {
-        binProtocol.writeObject(new Text(pair.getKey().toString()));
-        String valueStr = pair.getValue().toString();
-        binProtocol.writeObject(new Text(valueStr));
-
-        LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
-            + pair.getKey()
+        WritableUtils.writeVInt(this.outStream, MessageType.READ_KEYVALUE.code);
+        binProtocol.writeObject((Writable) pair.getKey());
+        binProtocol.writeObject((Writable) pair.getValue());
+
+        LOG.debug("Responded MessageType.READ_KEYVALUE -"
+            + " Key: "
+            + ((pair.getKey().toString().length() < 10) ? pair.getKey()
+                .toString() : pair.getKey().toString().substring(0, 9) + "...")
             + " Value: "
-            + ((valueStr.length() < 10) ? valueStr : valueStr.substring(0, 9)
+            + ((pair.getValue().toString().length() < 10) ? pair.getValue()
+                .toString() : pair.getValue().toString().substring(0, 9)
                 + "..."));
 
       } else {
-        Text.writeString(stream, "");
-        Text.writeString(stream, "");
-        LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+        WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
+        LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
       }
       binProtocol.flush();
 
     } else {
-      /* TODO */
-      /* Send empty Strings to show no KeyValue pair is available */
-      WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
-      Text.writeString(stream, "");
-      Text.writeString(stream, "");
+      WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
       binProtocol.flush();
-      LOG.debug("Responded MessageType.READ_KEYVALUE - EMPTY KeyValue Pair");
+      LOG.debug("Responded MessageType.READ_KEYVALUE - END_OF_DATA");
     }
   }
 
+  @SuppressWarnings("unchecked")
   public void writeKeyValue() throws IOException {
-    readObject(key); // string or binary only
-    readObject(value); // string or binary only
-    if (LOG.isDebugEnabled())
-      LOG.debug("Got MessageType.WRITE_KEYVALUE - Key: " + key + " Value: "
-          + value);
-    peer.write(key, value);
+
+    KEYOUT keyOut = (KEYOUT) ReflectionUtils.newInstance(
+        (Class<? extends KEYOUT>) conf.getClass("bsp.output.key.class",
+            Object.class), conf);
+
+    VALUEOUT valueOut = (VALUEOUT) ReflectionUtils.newInstance(
+        (Class<? extends VALUEOUT>) conf.getClass("bsp.output.value.class",
+            Object.class), conf);
+
+    LOG.debug("Got MessageType.WRITE_KEYVALUE keyOutClass: "
+        + keyOut.getClass().getName() + " valueOutClass: " + valueOut.getClass().getName());
+
+    readObject((Writable) keyOut);
+    readObject((Writable) valueOut);
+
+    peer.write(keyOut, valueOut);
+
+    LOG.debug("Done MessageType.WRITE_KEYVALUE -"
+        + " Key: "
+        + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
+            .toString().substring(0, 9) + "...")
+        + " Value: "
+        + ((valueOut.toString().length() < 10) ? valueOut.toString() : valueOut
+            .toString().substring(0, 9) + "..."));
   }
 
   public void seqFileOpen() throws IOException {
-    String path = Text.readString(inStream);
+    String path = Text.readString(this.inStream);
     // option - read = "r" or write = "w"
-    String option = Text.readString(inStream);
-    // key and value Type stored in the SequenceFile
-    String keyType = Text.readString(inStream);
-    String valueType = Text.readString(inStream);
+    String option = Text.readString(this.inStream);
+    // key and value class stored in the SequenceFile
+    String keyClass = Text.readString(this.inStream);
+    String valueClass = Text.readString(this.inStream);
+    LOG.debug("GOT MessageType.SEQFILE_OPEN - Option: " + option);
+    LOG.debug("GOT MessageType.SEQFILE_OPEN - KeyClass: " + keyClass);
+    LOG.debug("GOT MessageType.SEQFILE_OPEN - ValueClass: " + valueClass);
 
     int fileID = -1;
 
@@ -367,98 +396,165 @@ public class UplinkReader<K1 extends Wri
       SequenceFile.Reader reader;
       try {
         reader = new SequenceFile.Reader(fs, new Path(path), conf);
+
+        // try to load key and value class
+        Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
+        Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
+            valueClass);
+
+        // try to instantiate key and value class
+        Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance(
+            sequenceKeyClass, conf);
+        Writable sequenceValueWritable = (Writable) ReflectionUtils
+            .newInstance(sequenceValueClass, conf);
+
+        // put new fileID and key and value Writable instances into HashMap
         fileID = reader.hashCode();
         sequenceFileReaders
             .put(
                 fileID,
-                new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<String, String>>(
-                    reader, new AbstractMap.SimpleEntry<String, String>(
-                        keyType, valueType)));
+                new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable, Writable>>(
+                    reader, new AbstractMap.SimpleEntry<Writable, Writable>(
+                        sequenceKeyWritable, sequenceValueWritable)));
+
       } catch (IOException e) {
         fileID = -1;
+      } catch (ClassNotFoundException e) {
+        fileID = -1;
       }
 
     } else if (option.equals("w")) {
       SequenceFile.Writer writer;
       try {
-        writer = new SequenceFile.Writer(fs, conf, new Path(path), Text.class,
-            Text.class);
+
+        // try to load key and value class
+        Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
+        Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
+            valueClass);
+
+        writer = new SequenceFile.Writer(fs, conf, new Path(path),
+            sequenceKeyClass, sequenceValueClass);
+
+        // try to instantiate key and value class
+        Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance(
+            sequenceKeyClass, conf);
+        Writable sequenceValueWritable = (Writable) ReflectionUtils
+            .newInstance(sequenceValueClass, conf);
+
+        // put new fileID and key and value Writable instances into HashMap
         fileID = writer.hashCode();
         sequenceFileWriters
             .put(
                 fileID,
-                new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<String, String>>(
-                    writer, new AbstractMap.SimpleEntry<String, String>(
-                        keyType, valueType)));
+                new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable, Writable>>(
+                    writer, new AbstractMap.SimpleEntry<Writable, Writable>(
+                        sequenceKeyWritable, sequenceValueWritable)));
+
       } catch (IOException e) {
         fileID = -1;
+      } catch (ClassNotFoundException e) {
+        fileID = -1;
       }
     }
 
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.SEQFILE_OPEN.code);
-    WritableUtils.writeVInt(stream, fileID);
+    WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_OPEN.code);
+    WritableUtils.writeVInt(this.outStream, fileID);
     binProtocol.flush();
     LOG.debug("Responded MessageType.SEQFILE_OPEN - FileID: " + fileID);
   }
 
-  public void seqFileReadNext() throws IOException, ClassNotFoundException {
-    int fileID = WritableUtils.readVInt(inStream);
-    // LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
-
-    Class<?> keyType = conf.getClassLoader().loadClass(
-        sequenceFileReaders.get(fileID).getValue().getKey());
-    Writable key = (Writable) ReflectionUtils.newInstance(keyType, conf);
-
-    Class<?> valueType = conf.getClassLoader().loadClass(
-        sequenceFileReaders.get(fileID).getValue().getValue());
-    Writable value = (Writable) ReflectionUtils.newInstance(valueType, conf);
+  public void seqFileReadNext() throws IOException {
+    int fileID = WritableUtils.readVInt(this.inStream);
+    LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
 
-    if (sequenceFileReaders.containsKey(fileID))
-      sequenceFileReaders.get(fileID).getKey().next(key, value);
+    // check if fileID is available in sequenceFileReader
+    if (sequenceFileReaders.containsKey(fileID)) {
 
-    // RESPOND
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.SEQFILE_READNEXT.code);
-    try {
-      String k = key.toString();
-      String v = value.toString();
-      Text.writeString(stream, k);
-      Text.writeString(stream, v);
-      LOG.debug("Responded MessageType.SEQFILE_READNEXT - key: " + k
-          + " value: " + ((v.length() < 10) ? v : v.substring(0, 9) + "..."));
-
-    } catch (NullPointerException e) { // key or value is null
-
-      Text.writeString(stream, "");
-      Text.writeString(stream, "");
-      LOG.debug("Responded MessageType.SEQFILE_READNEXT - EMPTY KeyValue Pair");
+      Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
+          .getKey();
+      Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
+          .getValue().getValue();
+
+      // try to read next key/value pair from SequenceFile.Reader
+      if (sequenceFileReaders.get(fileID).getKey()
+          .next(sequenceKeyWritable, sequenceValueWritable)) {
+
+        WritableUtils.writeVInt(this.outStream,
+            MessageType.SEQFILE_READNEXT.code);
+        binProtocol.writeObject(sequenceKeyWritable);
+        binProtocol.writeObject(sequenceValueWritable);
+
+        LOG.debug("Responded MessageType.SEQFILE_READNEXT -"
+            + " Key: "
+            + ((sequenceKeyWritable.toString().length() < 10) ? sequenceKeyWritable
+                .toString() : sequenceKeyWritable.toString().substring(0, 9)
+                + "...")
+            + " Value: "
+            + ((sequenceValueWritable.toString().length() < 10) ? sequenceValueWritable
+                .toString() : sequenceValueWritable.toString().substring(0, 9)
+                + "..."));
+
+      } else { // false when at end of file
+
+        WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
+        LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
+      }
+      binProtocol.flush();
+
+    } else { // no fileID stored
+      LOG.warn("SequenceFileReader: FileID " + fileID + " not found!");
+      WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
+      LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
+      binProtocol.flush();
     }
-    binProtocol.flush();
   }
 
   public void seqFileAppend() throws IOException {
-    int fileID = WritableUtils.readVInt(inStream);
-    String keyStr = Text.readString(inStream);
-    String valueStr = Text.readString(inStream);
+    int fileID = WritableUtils.readVInt(this.inStream);
+    LOG.debug("GOT MessageType.SEQFILE_APPEND - FileID: " + fileID);
 
     boolean result = false;
+
+    // check if fileID is available in sequenceFileWriter
     if (sequenceFileWriters.containsKey(fileID)) {
-      sequenceFileWriters.get(fileID).getKey()
-          .append(new Text(keyStr), new Text(valueStr));
-      result = true;
+
+      Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
+          .getKey();
+      Writable sequenceValueWritable = sequenceFileReaders.get(fileID)
+          .getValue().getValue();
+
+      // try to read key and value
+      readObject(sequenceKeyWritable);
+      readObject(sequenceValueWritable);
+
+      if ((sequenceKeyWritable != null) && (sequenceValueWritable != null)) {
+
+        // append to sequenceFile
+        sequenceFileWriters.get(fileID).getKey()
+            .append(sequenceKeyWritable, sequenceValueWritable);
+
+        LOG.debug("Stored data: Key: "
+            + ((sequenceKeyWritable.toString().length() < 10) ? sequenceKeyWritable
+                .toString() : sequenceKeyWritable.toString().substring(0, 9)
+                + "...")
+            + " Value: "
+            + ((sequenceValueWritable.toString().length() < 10) ? sequenceValueWritable
+                .toString() : sequenceValueWritable.toString().substring(0, 9)
+                + "..."));
+
+        result = true;
+      }
     }
 
     // RESPOND
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.SEQFILE_APPEND.code);
-    WritableUtils.writeVInt(stream, result ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_APPEND.code);
+    WritableUtils.writeVInt(this.outStream, result ? 1 : 0);
     binProtocol.flush();
     LOG.debug("Responded MessageType.SEQFILE_APPEND - Result: " + result);
   }
 
   public void seqFileClose() throws IOException {
-    int fileID = WritableUtils.readVInt(inStream);
+    int fileID = WritableUtils.readVInt(this.inStream);
 
     boolean result = false;
 
@@ -471,15 +567,14 @@ public class UplinkReader<K1 extends Wri
     }
 
     // RESPOND
-    DataOutputStream stream = binProtocol.getStream();
-    WritableUtils.writeVInt(stream, MessageType.SEQFILE_CLOSE.code);
-    WritableUtils.writeVInt(stream, result ? 1 : 0);
+    WritableUtils.writeVInt(this.outStream, MessageType.SEQFILE_CLOSE.code);
+    WritableUtils.writeVInt(this.outStream, result ? 1 : 0);
     binProtocol.flush();
     LOG.debug("Responded MessageType.SEQFILE_CLOSE - Result: " + result);
   }
 
   public void partitionResponse() throws IOException {
-    int partResponse = WritableUtils.readVInt(inStream);
+    int partResponse = WritableUtils.readVInt(this.inStream);
     synchronized (binProtocol.resultLock) {
       binProtocol.setResult(partResponse);
       LOG.debug("Received MessageType.PARTITION_RESPONSE - Result: "
@@ -488,29 +583,61 @@ public class UplinkReader<K1 extends Wri
     }
   }
 
+  /**
+   * Read the given object from stream. If it is a IntWritable, LongWritable,
+   * FloatWritable, DoubleWritable, Text or BytesWritable, read it directly.
+   * Otherwise, read it to a buffer and then write the length and data to the
+   * stream.
+   * 
+   * @param obj the object to read
+   * @throws IOException
+   */
   protected void readObject(Writable obj) throws IOException {
-    int numBytes = readCommand();
     byte[] buffer;
+
     // For BytesWritable and Text, use the specified length to set the length
     // this causes the "obvious" translations to work. So that if you emit
     // a string "abc" from C++, it shows up as "abc".
-    if (obj instanceof BytesWritable) {
-      buffer = new byte[numBytes];
-      inStream.readFully(buffer);
-      ((BytesWritable) obj).set(buffer, 0, numBytes);
-    } else if (obj instanceof Text) {
+
+    if (obj instanceof Text) {
+      int numBytes = WritableUtils.readVInt(this.inStream);
       buffer = new byte[numBytes];
-      inStream.readFully(buffer);
+      this.inStream.readFully(buffer);
       ((Text) obj).set(buffer);
+
+    } else if (obj instanceof BytesWritable) {
+      int numBytes = WritableUtils.readVInt(this.inStream);
+      buffer = new byte[numBytes];
+      this.inStream.readFully(buffer);
+      ((BytesWritable) obj).set(buffer, 0, numBytes);
+
+    } else if (obj instanceof IntWritable) {
+      LOG.debug("read IntWritable");
+      ((IntWritable) obj).set(WritableUtils.readVInt(this.inStream));
+
+    } else if (obj instanceof LongWritable) {
+      ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
+
+      // else if ((obj instanceof FloatWritable) || (obj instanceof
+      // DoubleWritable))
+
     } else if (obj instanceof NullWritable) {
-      throw new IOException(
-          "Cannot read data into NullWritable! Check OutputClasses!");
+      throw new IOException("Cannot read data into NullWritable!");
+
     } else {
-      /* TODO */
-      /* IntWritable, DoubleWritable */
-      throw new IOException(
-          "Hama Pipes does only support Text as Key/Value output!");
-      // obj.readFields(inStream);
+      // Note: other types are transfered as String which should be implemented
+      // in Writable itself
+      try {
+        LOG.debug("reading other type");
+        // try reading object
+        obj.readFields(this.inStream);
+        // String s = Text.readString(inStream);
+
+      } catch (IOException e) {
+
+        throw new IOException("Hama Pipes is not able to read "
+            + obj.getClass().getName(), e);
+      }
     }
   }
 }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/Generator.java Sat Nov 23 08:41:09 2013
@@ -27,6 +27,8 @@ public class Generator {
       System.out
           .println("  fastgen: Generate random matrix, which can be used as a input of graph examples and is faster than symmetric.");
       System.out.println("  square: Generate random square matrix.");
+      System.out
+          .println("  vectorwritablematrix: Generate a random matrix, consisting of VectorWritables.");
       System.exit(1);
     }
 
@@ -37,6 +39,8 @@ public class Generator {
       SymmetricMatrixGen.main(newArgs);
     } else if (args[0].equals("fastgen")) {
       FastGraphGen.main(newArgs);
+    } else if (args[0].equals("vectorwritablematrix")) {
+      VectorWritableMatrixGen.main(newArgs);
     } else if (args[0].equals("square")) {
       System.out.println("Not implemented yet.");
       // SquareMatrixGen.main(newArgs);

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java?rev=1544761&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/VectorWritableMatrixGen.java Sat Nov 23 08:41:09 2013
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.PipesVectorWritable;
+import org.apache.hama.commons.io.VectorWritable;
+import org.apache.hama.commons.math.DenseDoubleVector;
+
+public class VectorWritableMatrixGen {
+  private static final Log LOG = LogFactory
+      .getLog(VectorWritableMatrixGen.class);
+
+  public static void main(String[] args) throws InterruptedException,
+      IOException, ClassNotFoundException {
+
+    boolean saveTransposed = false;
+    boolean usePipesVectorWritable = false;
+    double minValue = 0;
+    double maxValue = 1000;
+    int precision = 3;
+
+    // check arguments
+    if (args.length < 3) {
+      System.out
+          .println("Usage: <rowSize> <colSize> <outputPath>"
+              + " [<saveTransposed=true|false(default)>] [<usePipesVectorWritable=true|false(default)>]"
+              + " [<minValue=" + minValue + ">] [<maxValue=" + maxValue + ">]"
+              + " [<precision=" + precision + ">]");
+      System.out
+          .println("e.g., hama jar hama-examples-*.jar gen vectorwritablematrix 10 10 /tmp/matrix.seq");
+      System.out
+          .println("      hama jar hama-examples-*.jar gen vectorwritablematrix 10 10 /tmp/matrix.seq"
+              + " false false 0 10 2");
+      System.exit(1);
+    }
+
+    int rowSize = Integer.parseInt(args[0]);
+    int colSize = Integer.parseInt(args[1]);
+    Path outputPath = new Path(args[2]);
+
+    if (args.length > 3) {
+      saveTransposed = Boolean.parseBoolean(args[3]);
+      if (args.length > 4) {
+        usePipesVectorWritable = Boolean.parseBoolean(args[4]);
+        if (args.length > 5) {
+          minValue = Double.parseDouble(args[5]);
+          if (args.length > 6) {
+            maxValue = Double.parseDouble(args[6]);
+            if (args.length > 7) {
+              precision = Integer.parseInt(args[7]);
+            }
+          }
+        }
+      }
+    }
+
+    LOG.debug("rowSize: " + rowSize + " colSize: " + colSize + " outputPath: "
+        + outputPath);
+    LOG.debug("saveTransposed: " + saveTransposed + " usePipesVectorWritable: "
+        + usePipesVectorWritable);
+    LOG.debug("minValue: " + minValue + " maxValue: " + maxValue
+        + " precision: " + precision);
+
+    // create random double matrix
+    double[][] matrix = createRandomMatrix(rowSize, colSize, new Random(),
+        minValue, maxValue, precision);
+
+    // write matrix to dfs
+    writeMatrix(matrix, outputPath, saveTransposed, usePipesVectorWritable);
+  }
+
+  public static double[][] createRandomMatrix(int rows, int columns,
+      Random rand, double rangeMin, double rangeMax, int precision) {
+
+    LOG.debug("createRandomMatrix rows: " + rows + " cols: " + columns);
+
+    final double[][] matrix = new double[rows][columns];
+
+    for (int i = 0; i < rows; i++) {
+      for (int j = 0; j < columns; j++) {
+        double randomValue = rangeMin + (rangeMax - rangeMin)
+            * rand.nextDouble();
+
+        matrix[i][j] = new BigDecimal(randomValue).setScale(precision,
+            BigDecimal.ROUND_DOWN).doubleValue();
+      }
+    }
+    return matrix;
+  }
+
+  public static Path writeMatrix(double[][] matrix, Path path,
+      boolean saveTransposed, boolean usePipesVectorWritable) {
+
+    LOG.debug("writeMatrix path: " + path + " saveTransposed: "
+        + saveTransposed + " usePipesVectorWritable: " + usePipesVectorWritable);
+
+    // transpose matrix before saving
+    if (saveTransposed) {
+      int rows = matrix.length;
+      int columns = matrix[0].length;
+      double[][] transposed = new double[columns][rows];
+      for (int i = 0; i < rows; i++) {
+        for (int j = 0; j < columns; j++) {
+          transposed[j][i] = matrix[i][j];
+        }
+      }
+      matrix = transposed;
+    }
+
+    // Write matrix to DFS
+    HamaConfiguration conf = new HamaConfiguration();
+    SequenceFile.Writer writer = null;
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      // use PipesVectorWritable if specified
+      if (usePipesVectorWritable) {
+        writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
+            PipesVectorWritable.class);
+
+        for (int i = 0; i < matrix.length; i++) {
+          DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]);
+          writer.append(new IntWritable(i), new PipesVectorWritable(rowVector));
+          LOG.debug("IntWritable: " + i + " PipesVectorWritable: "
+              + rowVector.toString());
+        }
+
+      } else {
+        writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
+            VectorWritable.class);
+
+        for (int i = 0; i < matrix.length; i++) {
+          DenseDoubleVector rowVector = new DenseDoubleVector(matrix[i]);
+          writer.append(new IntWritable(i), new VectorWritable(rowVector));
+          LOG.debug("IntWritable: " + i + " VectorWritable: "
+              + rowVector.toString());
+        }
+      }
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return path;
+  }
+
+}

Modified: hama/trunk/src/assemble/bin.xml
URL: http://svn.apache.org/viewvc/hama/trunk/src/assemble/bin.xml?rev=1544761&r1=1544760&r2=1544761&view=diff
==============================================================================
--- hama/trunk/src/assemble/bin.xml (original)
+++ hama/trunk/src/assemble/bin.xml Sat Nov 23 08:41:09 2013
@@ -103,14 +103,6 @@
       <outputDirectory>../hama-${project.version}/lib</outputDirectory>
     </fileSet>
     <fileSet>
-      <directory>../c++/target/native</directory>
-      <includes>
-        <include>*.a</include>
-      </includes>
-      <fileMode>755</fileMode>
-      <outputDirectory>../hama-${project.version}/lib/native</outputDirectory>
-    </fileSet>
-    <fileSet>
       <directory>../bin</directory>
       <includes>
         <include>hama</include>



Mime
View raw message