hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1557100 [1/2] - in /hama/trunk: ./ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/pipes/api/hama/ c++/src/main/native/pipes/impl/ core/src/main/java/org/apache/hama/pipes/ core/src/main/java/org/a...
Date Fri, 10 Jan 2014 11:59:46 GMT
Author: millecker
Date: Fri Jan 10 11:59:46 2014
New Revision: 1557100

URL: http://svn.apache.org/r1557100
Log:
HAMA-839: Support NullWritable in Hama Pipes

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
    hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
    hama/trunk/c++/src/main/native/examples/conf/summation.xml
    hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
    hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
    hama/trunk/c++/src/main/native/examples/impl/summation.cc
    hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
    hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jan 10 11:59:46 2014
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
-  
+
+   HAMA-839: Support NullWritable in Hama Pipes (Martin Illecker)    
    HAMA-837: Add sort behaviour to runtime partitioner (edwardyoon)
    HAMA-827: Add NamedVector (edwardyoon)
    HAMA-822: Add feature transformer interface to improve the power and flexibility of existing machine learning model (Yexi Jiang)

Modified: hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/matrixmultiplication.xml?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/matrixmultiplication.xml Fri Jan 10 11:59:46 2014
@@ -22,14 +22,6 @@
     <value>hdfs:/examples/bin/matrixmultiplication</value>
   </property>
   <property>
-    <name>hama.pipes.java.recordreader</name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>hama.pipes.java.recordwriter</name>
-    <value>true</value>
-  </property>
-  <property>
     <name>bsp.input.dir</name>
     <value>/examples/input/matrixmultiplication/MatrixA.seq</value>                                            
   </property>

Modified: hama/trunk/c++/src/main/native/examples/conf/piestimator.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/piestimator.xml?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/piestimator.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/piestimator.xml Fri Jan 10 11:59:46 2014
@@ -22,14 +22,6 @@
     <value>hdfs:/examples/bin/piestimator</value>
   </property>
   <property>
-    <name>hama.pipes.java.recordreader</name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>hama.pipes.java.recordwriter</name>
-    <value>true</value>
-  </property>
-  <property>
     <name>bsp.input.format.class</name>
     <value>org.apache.hama.bsp.NullInputFormat</value>
   </property>
@@ -39,7 +31,7 @@
   </property>
   <property>
     <name>bsp.output.key.class</name>
-    <value>org.apache.hadoop.io.Text</value>
+    <value>org.apache.hadoop.io.NullWritable</value>
   </property>
   <property>
     <name>bsp.output.value.class</name>

Modified: hama/trunk/c++/src/main/native/examples/conf/summation.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/conf/summation.xml?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/conf/summation.xml (original)
+++ hama/trunk/c++/src/main/native/examples/conf/summation.xml Fri Jan 10 11:59:46 2014
@@ -22,14 +22,6 @@
     <value>hdfs:/examples/bin/summation</value>
   </property>
   <property>
-    <name>hama.pipes.java.recordreader</name>
-    <value>true</value>
-  </property>
-  <property>
-    <name>hama.pipes.java.recordwriter</name>
-    <value>true</value>
-  </property>
-  <property>
     <name>bsp.input.format.class</name>
     <value>org.apache.hama.bsp.KeyValueTextInputFormat</value>
   </property>
@@ -47,7 +39,7 @@
   </property>
   <property>
     <name>bsp.output.key.class</name>
-    <value>org.apache.hadoop.io.Text</value>
+    <value>org.apache.hadoop.io.NullWritable</value>
   </property>
   <property>
     <name>bsp.output.value.class</name>

Modified: hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/matrixmultiplication.cc Fri Jan 10 11:59:46 2014
@@ -130,7 +130,7 @@ public:
   
 };
 
-class MatrixRowPartitioner: public Partitioner<int,string,int,string,string> {
+class MatrixRowPartitioner: public Partitioner<int,string> {
 public:
   MatrixRowPartitioner(BSPContext<int,string,int,string,string>& context) { }
   

Modified: hama/trunk/c++/src/main/native/examples/impl/piestimator.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/piestimator.cc?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/piestimator.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/piestimator.cc Fri Jan 10 11:59:46 2014
@@ -18,7 +18,6 @@
 
 #include "hama/Pipes.hh"
 #include "hama/TemplateFactory.hh"
-#include "hadoop/StringUtils.hh"
 
 #include <time.h>
 #include <math.h>
@@ -27,32 +26,30 @@
 #include <iostream>
 
 using std::string;
-using std::cout;
 
 using HamaPipes::BSP;
 using HamaPipes::BSPContext;
-using namespace HadoopUtils;
 
-class PiEstimatorBSP: public BSP<string,string,string,double,int> {
-  private:
+class PiEstimatorBSP: public BSP<void,void,void,double,int> {
+private:
   string master_task_;
   long iterations_; // iterations_per_bsp_task
   
-  public:
-  PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) {
-    iterations_ = 1000000L;
+public:
+  PiEstimatorBSP(BSPContext<void,void,void,double,int>& context) {
+    iterations_ = 10000000L;
   }
   
   inline double closed_interval_rand(double x0, double x1) {
     return x0 + (x1 - x0) * rand() / ((double) RAND_MAX);
   }
   
-  void setup(BSPContext<string,string,string,double,int>& context) {
+  void setup(BSPContext<void,void,void,double,int>& context) {
     // Choose one as a master
     master_task_ = context.getPeerName(context.getNumPeers() / 2);
   }
   
-  void bsp(BSPContext<string,string,string,double,int>& context) {
+  void bsp(BSPContext<void,void,void,double,int>& context) {
     
     /* initialize random seed */
     srand(time(NULL));
@@ -70,7 +67,7 @@ class PiEstimatorBSP: public BSP<string,
     context.sync();
   }
   
-  void cleanup(BSPContext<string,string,string,double,int>& context) {
+  void cleanup(BSPContext<void,void,void,double,int>& context) {
     if (context.getPeerName().compare(master_task_)==0) {
       
       long total_hits = 0;
@@ -80,12 +77,12 @@ class PiEstimatorBSP: public BSP<string,
       }
       
       double pi = 4.0 * total_hits / (msg_count * iterations_);
-      context.write("Estimated value of PI", pi);
+      context.write(pi);
     }
   }
 };
 
 int main(int argc, char *argv[]) {
-  return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>());
+  return HamaPipes::runTask<void,void,void,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,void,void,void,double,int>());
 }
 

Modified: hama/trunk/c++/src/main/native/examples/impl/summation.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/examples/impl/summation.cc?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/examples/impl/summation.cc (original)
+++ hama/trunk/c++/src/main/native/examples/impl/summation.cc Fri Jan 10 11:59:46 2014
@@ -29,25 +29,27 @@ using std::string;
 using HamaPipes::BSP;
 using HamaPipes::BSPContext;
 
-class SummationBSP: public BSP<string,string,string,double,double> {
-  private:
+class SummationBSP: public BSP<string,string,void,double,double> {
+private:
   string master_task_;
   
-  public:
-  SummationBSP(BSPContext<string,string,string,double,double>& context) {  }
+public:
+  SummationBSP(BSPContext<string,string,void,double,double>& context) {  }
   
-  void setup(BSPContext<string,string,string,double,double>& context) {
+  void setup(BSPContext<string,string,void,double,double>& context) {
     // Choose one as a master
     master_task_ = context.getPeerName(context.getNumPeers() / 2);
   }
   
-  void bsp(BSPContext<string,string,string,double,double>& context) {
+  void bsp(BSPContext<string,string,void,double,double>& context) {
     
     double intermediate_sum = 0.0;
     string key;
     string value;
     
     while(context.readNext(key,value)) {
+      // We are using the KeyValueTextInputFormat,
+      // therefore we have to convert string value to double
       intermediate_sum += HadoopUtils::toDouble(value);
     }
     
@@ -55,7 +57,7 @@ class SummationBSP: public BSP<string,st
     context.sync();
   }
   
-  void cleanup(BSPContext<string,string,string,double,double>& context) {
+  void cleanup(BSPContext<string,string,void,double,double>& context) {
     if (context.getPeerName().compare(master_task_)==0) {
       
       double sum = 0.0;
@@ -63,12 +65,12 @@ class SummationBSP: public BSP<string,st
       for (int i=0; i < msg_count; i++) {
         sum += context.getCurrentMessage();
       }
-      context.write("Sum", sum);
+      context.write(sum);
     }
   }
 };
 
 int main(int argc, char *argv[]) {
-  return HamaPipes::runTask<string,string,string,double,double>(HamaPipes::TemplateFactory<SummationBSP,string,string,string,double,double>());
+  return HamaPipes::runTask<string,string,void,double,double>(HamaPipes::TemplateFactory<SummationBSP,string,string,void,double,double>());
 }
 

Modified: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (original)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Fri Jan 10 11:59:46 2014
@@ -44,8 +44,6 @@ using std::string;
 using std::vector;
 using std::pair;
 
-using namespace HadoopUtils;
-
 namespace HamaPipes {
   
   // global varibales
@@ -91,6 +89,9 @@ namespace HamaPipes {
     stringify( LOG ), stringify( END_OF_DATA )
   };
   
+  /********************************************/
+  /*************** KeyValuePair ***************/
+  /********************************************/
   /**
    * Generic KeyValuePair including is_empty
    */
@@ -103,10 +104,10 @@ namespace HamaPipes {
     explicit KeyValuePair(bool x) : is_empty(x) {}
     KeyValuePair(const K& k, const V& v) : base_t(k, v), is_empty(false) {}
     
-    template <class X, class Y>
+    template <typename X, typename Y>
     KeyValuePair(const pair<X,Y> &p) : base_t(p), is_empty(false) {}
     
-    template <class X, class Y>
+    template <typename X, typename Y>
     KeyValuePair(const KeyValuePair<X,Y> &p) : base_t(p), is_empty(p.is_empty) {}
   };
   
@@ -159,24 +160,24 @@ namespace HamaPipes {
     virtual const string& get(const string& key) const {
       map<string,string>::const_iterator itr = values_.find(key);
       if (itr == values_.end()) {
-        throw Error("Key " + key + " not found in BSPJob");
+        throw HadoopUtils::Error("Key " + key + " not found in BSPJob");
       }
       return itr->second;
     }
     
     virtual int getInt(const string& key) const {
       const string& val = get(key);
-      return toInt(val);
+      return HadoopUtils::toInt(val);
     }
     
     virtual float getFloat(const string& key) const {
       const string& val = get(key);
-      return toFloat(val);
+      return HadoopUtils::toFloat(val);
     }
     
     virtual bool getBoolean(const string& key) const {
       const string& val = get(key);
-      return toBool(val);
+      return HadoopUtils::toBool(val);
     }
   };
   
@@ -250,20 +251,52 @@ namespace HamaPipes {
   };
   
   /********************************************/
+  /******** DownwardProtocolPartition *********/
+  /********************************************/
+  /* DownwardProtocolPartition wraps void template parameter */
+  template<typename D, typename K, typename V>
+  class DownwardProtocolPartition {
+  public:
+    void runPartition(const K& key, const V& value, int32_t num_tasks) {
+      static_cast<D*>(this)->template runPartition<K,V>(key, value, num_tasks);
+    }
+  };
+  
+  template<typename D, typename K>
+  class DownwardProtocolPartition<D, K, void> {
+  public:
+    void runPartition(const K& key, int32_t num_tasks) {
+      static_cast<D*>(this)->template runPartition<K>(key, num_tasks);
+    }
+  };
+  
+  template<typename D, typename V>
+  class DownwardProtocolPartition<D, void, V> {
+  public:
+    void runPartition(const V& value, int32_t num_tasks) {
+      static_cast<D*>(this)->template runPartition<V>(value, num_tasks);
+    }
+  };
+  
+  template<typename D>
+  class DownwardProtocolPartition<D, void, void> {
+  public:
+    /* Partition nothing */
+  };
+  
+  /********************************************/
   /************* DownwardProtocol *************/
   /********************************************/
-  template<class K1, class V1>
-  class DownwardProtocol {
+  template<typename D, typename K1, typename V1>
+  class DownwardProtocol : public DownwardProtocolPartition<D, K1, V1>{
   public:
     virtual void start(int protocol_version) = 0;
     virtual void setBSPJob(vector<string> values) = 0;
     virtual void setInputTypes(string key_type, string value_type) = 0;
     
-    virtual void runBsp(bool piped_input, bool piped_output) = 0;
-    virtual void runCleanup(bool piped_input, bool piped_output) = 0;
-    virtual void runSetup(bool piped_input, bool piped_output) = 0;
-    
-    virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) = 0;
+    virtual void runBsp() = 0;
+    virtual void runCleanup() = 0;
+    virtual void runSetup() = 0;
     
     virtual void close() = 0;
     virtual void abort() = 0;
@@ -278,22 +311,27 @@ namespace HamaPipes {
   public:
     virtual void sendCommand(int32_t cmd) = 0;
     
-    template<class T>
+    template<typename T>
     void sendCommand(int32_t cmd, T value) {
       static_cast<D*>(this)->template sendCommand<T>(cmd, value);
     }
     
-    template<class T>
+    template<typename T>
     void sendCommand(int32_t cmd, const T values[], int size) {
       static_cast<D*>(this)->template sendCommand<T>(cmd, values, size);
     }
     
-    template<class T1, class T2>
+    template<typename T1, typename T2>
     void sendCommand(int32_t cmd, T1 value1, T2 value2) {
       static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value1, value2);
     }
     
-    template<class T1, class T2>
+    template<typename T1, typename T2, typename T3>
+    void sendCommand(int32_t cmd, T1 value1, T2 value2, T3 value3) {
+      static_cast<D*>(this)->template sendCommand<T1,T2,T3>(cmd, value1, value2, value3);
+    }
+    
+    template<typename T1, typename T2>
     void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
       static_cast<D*>(this)->template sendCommand<T1,T2>(cmd, value, values, size);
     }
@@ -304,32 +342,75 @@ namespace HamaPipes {
     virtual ~UpwardProtocol() {}
   };
   
+  /********************************************/
+  /*********** ProtocolEventHandler ***********/
+  /********************************************/
+  /* ProtocolEventHandler wraps void template parameter */
+  template<typename D, typename K, typename V>
+  class ProtocolEventHandler {
+  public:
+    void nextEvent() {
+      static_cast<D*>(this)->template nextEvent<K,V>();
+    }
+  };
+  
+  template<typename D, typename K>
+  class ProtocolEventHandler<D, K, void> {
+  public:
+    void nextEvent() {
+      static_cast<D*>(this)->template nextEvent<K>();
+    }
+  };
+  
+  template<typename D, typename V>
+  class ProtocolEventHandler<D, void, V> {
+  public:
+    void nextEvent() {
+      static_cast<D*>(this)->template nextEvent<V>();
+    }
+  };
+  
+  template<typename D>
+  class ProtocolEventHandler<D, void, void> {
+  public:
+    void nextEvent() {
+      static_cast<D*>(this)->nextEvent();
+    }
+  };
+  
+  /********************************************/
+  /*********** BinaryUpwardProtocol ***********/
+  /********************************************/
   /* Forward definition of BinaryUpwardProtocol to pass to UpwardProtocol */
   class BinaryUpwardProtocol;
   
   /********************************************/
   /***************** Protocol *****************/
   /********************************************/
-  template<typename D>
-  class Protocol {
+  template<typename D, typename K1, typename V1>
+  class Protocol: public ProtocolEventHandler<D, K1, V1> {
   public:
     
-    template<class T>
+    template<typename T>
     T getResult(int32_t expected_response_cmd) {
       return static_cast<D*>(this)->template getResult<T>(expected_response_cmd);
     }
     
-    template<class T>
+    template<typename T>
     vector<T> getVectorResult(int32_t expected_response_cmd) {
       return static_cast<D*>(this)->template getVectorResult<T>(expected_response_cmd);
     }
     
-    template<class K, class V>
+    template<typename K, typename V>
     KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
       return static_cast<D*>(this)->template getKeyValueResult<K,V>(expected_response_cmd);
     }
     
-    virtual void nextEvent() = 0;
+    template<typename T>
+    KeyValuePair<T,T> getKeyValueResult(int32_t expected_response_cmd) {
+      return static_cast<D*>(this)->template getKeyValueResult<T>(expected_response_cmd);
+    }
+    
     virtual bool verifyResult(int32_t expected_response_cmd) = 0;
     virtual UpwardProtocol<BinaryUpwardProtocol>* getUplink() = 0;
     virtual ~Protocol(){}
@@ -362,28 +443,186 @@ namespace HamaPipes {
      * Read next key/value pair from the SequenceFile with fileID
      * Using Curiously recurring template pattern(CTRP)
      */
-    template<class K, class V>
+    template<typename K, typename V>
     bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
       return static_cast<D*>(this)->template sequenceFileReadNext<K,V>(file_id, key, value);
     }
     
     /**
+     * Read next key OR value from the SequenceFile with fileID
+     * key OR value type is NullWritable
+     */
+    template<typename T>
+    bool sequenceFileReadNext(int32_t file_id, T& key_or_value) {
+      return static_cast<D*>(this)->template sequenceFileReadNext<T>(file_id, key_or_value);
+    }
+    
+    /**
      * Append the next key/value pair to the SequenceFile with fileID
      * Using Curiously recurring template pattern(CTRP)
      */
-    template<class K, class V>
+    template<typename K, typename V>
     bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
       return static_cast<D*>(this)->template sequenceFileAppend<K,V>(file_id, key, value);
     }
+    
+    /**
+     * Append the next key OR value pair to the SequenceFile with fileID
+     * key OR value type is NullWritable
+     */
+    template<typename T>
+    bool sequenceFileAppend(int32_t file_id, const T& key_or_value) {
+      return static_cast<D*>(this)->template sequenceFileAppend<T>(file_id, key_or_value);
+    }
+  };
+  
+  /********************************************/
+  /****************** Reader ******************/
+  /********************************************/
+  /* Reader wraps void template parameter */
+  template<typename D, typename K, typename V>
+  class Reader {
+  public:
+    /**
+     * Deserializes the next input key value into the given objects
+     */
+    bool readNext(K& key, V& value) {
+      return static_cast<D*>(this)->template readNext<K,V>(key, value);
+    }
+    
+    /**
+     * Reads the next key value pair and returns it as a pair. It may reuse a
+     * {@link KeyValuePair} instance to save garbage collection time.
+     *
+     * @return null if there are no records left.
+     * @throws IOException
+     */
+    //public KeyValuePair<K1, V1> readNext() throws IOException;
+  };
+  
+  template<typename D, typename K>
+  class Reader<D, K, void> {
+  public:
+    /**
+     * Deserializes the next input key into the given object
+     * value type is NullWritable
+     */
+    bool readNext(K& key) {
+      return static_cast<D*>(this)->template readNext<K>(key);
+    }
+  };
+  
+  template<typename D, typename V>
+  class Reader<D, void, V> {
+  public:
+    /**
+     * Deserializes the next input value into the given object
+     * key type is NullWritable
+     */
+    bool readNext(V& value) {
+      return static_cast<D*>(this)->template readNext<V>(value);
+    }
+  };
+  
+  template<typename D>
+  class Reader<D, void, void> {
+  public:
+    /* key AND value type are NullWritable */
+    /* Read nothing */
+  };
+  
+  /********************************************/
+  /****************** Writer ******************/
+  /********************************************/
+  /* Writer wraps void template parameter */
+  template<typename D, typename K, typename V>
+  class Writer {
+  public:
+    /**
+     * Writes a key/value pair to the output collector
+     */
+    void write(const K& key, const V& value) {
+      static_cast<D*>(this)->template write<K,V>(key, value);
+    }
+  };
+  
+  template<typename D, typename K>
+  class Writer<D, K, void> {
+  public:
+    /**
+     * Writes a key to the output collector
+     * value type is NullWritable
+     */
+    void write(const K& key) {
+      static_cast<D*>(this)->template write<K>(key);
+    }
   };
   
+  template<typename D, typename V>
+  class Writer<D, void, V> {
+  public:
+    /**
+     * Writes a value to the output collector
+     * key type is NullWritable
+     */
+    void write(const V& value) {
+      static_cast<D*>(this)->template write<V>(value);
+    }
+  };
+  
+  template<typename D>
+  class Writer<D, void, void> {
+  public:
+    /* key AND value type are NullWritable */
+    /* Write nothing */
+  };
+  
+  /********************************************/
+  /**************** Messenger *****************/
+  /********************************************/
+  /* Messenger wraps void template parameter */
+  template<typename D, typename M>
+  class Messenger {
+  public:
+    /**
+     * Send a data with a tag to another BSPSlave corresponding to hostname.
+     * Messages sent by this method are not guaranteed to be received in a sent
+     * order.
+     */
+    void sendMessage(const string& peer_name, const M& msg) {
+      static_cast<D*>(this)->template sendMessage<M>(peer_name, msg);
+    }
+    
+    /**
+     * @return A message from the peer's received messages queue (a FIFO).
+     */
+    virtual M getCurrentMessage() {
+      return static_cast<D*>(this)->template getCurrentMessage<M>();
+    }
+  };
+  
+  template<typename D>
+  class Messenger<D, void> {
+  public:
+    /* message type is NullWritable */
+    /* therefore messenger is not available */
+  };
+  
+  /********************************************/
+  /************** BSPContextImpl **************/
+  /********************************************/
   /* Forward definition of BSPContextImpl to pass to SequenceFileConnector */
-  template<class K1, class V1, class K2, class V2, class M>
+  template<typename K1, typename V1, typename K2, typename V2, typename M>
   class BSPContextImpl;
   
-  
-  template<class K1, class V1, class K2, class V2, class M>
-  class BSPContext: public TaskContext, public SequenceFileConnector<BSPContextImpl<K1, V1, K2, V2, M> > {
+  /********************************************/
+  /***************** BSPContext ***************/
+  /********************************************/
+  template<typename K1, typename V1, typename K2, typename V2, typename M>
+  class BSPContext: public TaskContext, public SequenceFileConnector<BSPContextImpl<K1, V1, K2, V2, M> >,
+  public Reader<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>,
+  public Writer<BSPContextImpl<K1, V1, K2, V2, M>, K2, V2>,
+  public Messenger<BSPContextImpl<K1, V1, K2, V2, M>, M> {
   public:
     
     /**
@@ -402,18 +641,6 @@ namespace HamaPipes {
     virtual string getInputValueClass() = 0;
     
     /**
-     * Send a data with a tag to another BSPSlave corresponding to hostname.
-     * Messages sent by this method are not guaranteed to be received in a sent
-     * order.
-     */
-    virtual void sendMessage(const string& peer_name, const M& msg) = 0;
-    
-    /**
-     * @return A message from the peer's received messages queue (a FIFO).
-     */
-    virtual M getCurrentMessage() = 0;
-    
-    /**
      * @return The number of messages in the peer's received messages queue.
      */
     virtual int getNumCurrentMessages() = 0;
@@ -463,49 +690,37 @@ namespace HamaPipes {
     virtual void clear() = 0;
     
     /**
-     * Writes a key/value pair to the output collector
-     */
-    virtual void write(const K2& key, const V2& value) = 0;
-    
-    /**
-     * Deserializes the next input key value into the given objects;
-     */
-    virtual bool readNext(K1& key, V1& value) = 0;
-    
-    /**
-     * Reads the next key value pair and returns it as a pair. It may reuse a
-     * {@link KeyValuePair} instance to save garbage collection time.
-     *
-     * @return null if there are no records left.
-     * @throws IOException
-     */
-    //public KeyValuePair<K1, V1> readNext() throws IOException;
-    
-    /**
      * Closes the input and opens it right away, so that the file pointer is at
      * the beginning again.
      */
     virtual void reopenInput() = 0;
-    
   };
   
+  /********************************************/
+  /****************** Closable ****************/
+  /********************************************/
   class Closable {
   public:
     virtual void close() {}
     virtual ~Closable() {}
   };
   
+  /********************************************/
+  /******************* BSP ********************/
+  /********************************************/
   /**
    * The application's BSP class to do bsp.
    */
-  template<class K1, class V1, class K2, class V2, class M>
+  template<typename K1, typename V1, typename K2, typename V2, typename M>
   class BSP: public Closable {
   public:
     /**
      * This method is called before the BSP method. It can be used for setup
      * purposes.
      */
-    virtual void setup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
+    virtual void setup(BSPContext<K1, V1, K2, V2, M>& context) {
+      // empty implementation, because overriding is optional
+    }
     
     /**
      * This method is your computation method, the main work of your BSP should be
@@ -518,49 +733,51 @@ namespace HamaPipes {
      * purposes. Cleanup is guranteed to be called after the BSP runs, even in
      * case of exceptions.
      */
-    virtual void cleanup(BSPContext<K1, V1, K2, V2, M>& context) = 0;
+    virtual void cleanup(BSPContext<K1, V1, K2, V2, M>& context) {
+      // empty implementation, because overriding is optional
+    }
   };
   
+  /********************************************/
+  /**************** Partitioner ***************/
+  /********************************************/
   /**
    * User code to decide where each key should be sent.
    */
-  template<class K1, class V1, class K2, class V2, class M>
+  template<typename K1, typename V1>
   class Partitioner {
   public:
-    
     virtual int partition(const K1& key, const V1& value, int32_t num_tasks) = 0;
     virtual ~Partitioner() {}
   };
   
-  /**
-   * For applications that want to read the input directly for the map function
-   * they can define RecordReaders in C++.
-   */
-  template<class K, class V>
-  class RecordReader: public Closable {
+  template<typename K1>
+  class Partitioner<K1, void> {
   public:
-    virtual bool next(K& key, V& value) = 0;
-    
-    /**
-     * The progress of the record reader through the split as a value between
-     * 0.0 and 1.0.
-     */
-    virtual float getProgress() = 0;
+    virtual int partition(const K1& key, int32_t num_tasks) = 0;
+    virtual ~Partitioner() {}
   };
   
-  /**
-   * An object to write key/value pairs as they are emited from the reduce.
-   */
-  template<class K, class V>
-  class RecordWriter: public Closable {
+  template<typename V1>
+  class Partitioner<void, V1> {
   public:
-    virtual void emit(const K& key, const V& value) = 0;
+    virtual int partition(const V1& value, int32_t num_tasks) = 0;
+    virtual ~Partitioner() {}
   };
   
+  template<>
+  class Partitioner<void, void> {
+  public:
+    /* Partition nothing */
+  };
+  
+  /********************************************/
+  /****************** Factory *****************/
+  /********************************************/
   /**
    * A factory to create the necessary application objects.
    */
-  template<class K1, class V1, class K2, class V2, class M>
+  template<typename K1, typename V1, typename K2, typename V2, typename M>
   class Factory {
   public:
     virtual BSP<K1, V1, K2, V2, M>* createBSP(BSPContext<K1, V1, K2, V2, M>& context) const = 0;
@@ -570,35 +787,20 @@ namespace HamaPipes {
      * @return the new partitioner or NULL, if the default partitioner should be
      * used.
      */
-    virtual Partitioner<K1, V1, K2, V2, M>* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
-      return NULL;
-    }
-    
-    /**
-     * Create an application record reader.
-     * @return the new RecordReader or NULL, if the Java RecordReader should be
-     *    used.
-     */
-    virtual RecordReader<K1,V1>* createRecordReader(BSPContext<K1, V1, K2, V2, M>& context) const {
-      return NULL;
-    }
-    
-    /**
-     * Create an application record writer.
-     * @return the new RecordWriter or NULL, if the Java RecordWriter should be
-     *    used.
-     */
-    virtual RecordWriter<K2,V2>* createRecordWriter(BSPContext<K1, V1, K2, V2, M>& context) const {
+    virtual Partitioner<K1, V1>* createPartitioner(BSPContext<K1, V1, K2, V2, M>& context) const {
       return NULL;
     }
     
     virtual ~Factory() {}
   };
   
+  /********************************************/
+  /***************** toString *****************/
+  /********************************************/
   /**
    * Generic toString
    */
-  template <class T>
+  template <typename T>
   string toString(const T& t)
   {
     std::ostringstream oss;
@@ -612,94 +814,100 @@ namespace HamaPipes {
     return t;
   }
   
+  /********************************************/
+  /*************** Serialization **************/
+  /********************************************/
   /**
    * Generic serialization
    */
-  template<class T>
-  void serialize(T t, OutStream& stream) {
-    serializeString(toString<T>(t), stream);
+  template<typename T>
+  void serialize(T t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeString(toString<T>(t), stream);
   }
   
   /**
    * Generic serialization template specializations
    */
-  template <> void serialize<int32_t>(int32_t t, OutStream& stream) {
-    serializeInt(t, stream);
+  template <> void serialize<int32_t>(int32_t t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeInt(t, stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::serializeInt '%d'\n", t);
     }
   }
-  template <> void serialize<int64_t>(int64_t t, OutStream& stream) {
-    serializeLong(t, stream);
+  template <> void serialize<int64_t>(int64_t t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeLong(t, stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::serializeLong '%ld'\n", (long)t);
     }
   }
-  template <> void serialize<float>(float t, OutStream& stream) {
-    serializeFloat(t, stream);
+  template <> void serialize<float>(float t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeFloat(t, stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::serializeFloat '%f'\n", t);
     }
   }
-  template <> void serialize<double>(double t, OutStream& stream) {
-    serializeDouble(t, stream);
+  template <> void serialize<double>(double t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeDouble(t, stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::serializeDouble '%f'\n", t);
     }
   }
-  template <> void serialize<string>(string t, OutStream& stream) {
-    serializeString(t, stream);
+  template <> void serialize<string>(string t, HadoopUtils::OutStream& stream) {
+    HadoopUtils::serializeString(t, stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::serializeString '%s'\n", t.c_str());
     }
   }
   
+  /********************************************/
+  /************** Deserialization *************/
+  /********************************************/
   /**
    * Generic deserialization
    */
-  template<class T>
-  T deserialize(InStream& stream) {
+  template<typename T>
+  T deserialize(HadoopUtils::InStream& stream) {
     string str = "Not able to deserialize type: ";
-    throw Error(str.append(typeid(T).name()));
+    throw HadoopUtils::Error(str.append(typeid(T).name()));
   }
   
   /**
    * Generic deserialization template specializations
    */
-  template <> int32_t deserialize<int32_t>(InStream& stream) {
-    int32_t result = deserializeInt(stream);
+  template <> int32_t deserialize<int32_t>(HadoopUtils::InStream& stream) {
+    int32_t result = HadoopUtils::deserializeInt(stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeInt result: '%d'\n",
               result);
     }
     return result;
   }
-  template <> int64_t deserialize<int64_t>(InStream& stream) {
-    int64_t result = deserializeLong(stream);
+  template <> int64_t deserialize<int64_t>(HadoopUtils::InStream& stream) {
+    int64_t result = HadoopUtils::deserializeLong(stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeLong result: '%ld'\n",
               (long)result);
     }
     return result;
   }
-  template <> float deserialize<float>(InStream& stream) {
-    float result = deserializeFloat(stream);
+  template <> float deserialize<float>(HadoopUtils::InStream& stream) {
+    float result = HadoopUtils::deserializeFloat(stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeFloat result: '%f'\n",
               result);
     }
     return result;
   }
-  template <> double deserialize<double>(InStream& stream) {
-    double result = deserializeDouble(stream);
+  template <> double deserialize<double>(HadoopUtils::InStream& stream) {
+    double result = HadoopUtils::deserializeDouble(stream);
     if (logging) {
       fprintf(stderr,"HamaPipes::BinaryProtocol::deserializeDouble result: '%f'\n",
               result);
     }
     return result;
   }
-  template <> string deserialize<string>(InStream& stream) {
-    string result = deserializeString(stream);
+  template <> string deserialize<string>(HadoopUtils::InStream& stream) {
+    string result = HadoopUtils::deserializeString(stream);
     
     if (logging) {
       if (result.empty()) {
@@ -712,17 +920,20 @@ namespace HamaPipes {
     return result;
   }
   
+  /********************************************/
+  /*********** runTask entry method ***********/
+  /********************************************/
   /**
    * Run the assigned task in the framework.
    * The user's main function should set the various functions using the
    * set* functions above and then call this.
    * @return true, if the task succeeded.
    */
-  template<class K1, class V1, class K2, class V2, class M>
+  template<typename K1, typename V1, typename K2, typename V2, typename M>
   bool runTask(const Factory<K1, V1, K2, V2, M>& factory);
   
   // Include implementation in header because of templates
-  #include "../../impl/Pipes.cc"
+#include "../../impl/Pipes.cc"
 }
 
 #endif

Modified: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Fri Jan 10 11:59:46 2014
@@ -21,13 +21,17 @@
 /********************************************/
 class BinaryUpwardProtocol: public UpwardProtocol<BinaryUpwardProtocol> {
 private:
-  FileOutStream* out_stream_;
+  HadoopUtils::FileOutStream* out_stream_;
 public:
   BinaryUpwardProtocol(FILE* out_stream) {
-    out_stream_ = new FileOutStream();
+    out_stream_ = new HadoopUtils::FileOutStream();
     HADOOP_ASSERT(out_stream_->open(out_stream), "problem opening stream");
   }
   
+  virtual void sendCommand(int32_t cmd) {
+    sendCommand(cmd, true);
+  }
+  
   /* local sendCommand function */
   void sendCommand(int32_t cmd, bool flush) {
     serialize<int32_t>(cmd, *out_stream_);
@@ -40,7 +44,7 @@ public:
     }
   }
   
-  template<class T>
+  template<typename T>
   void sendCommand(int32_t cmd, T value) {
     sendCommand(cmd, false);
     // Write out generic value
@@ -52,7 +56,7 @@ public:
     }
   }
   
-  template<class T>
+  template<typename T>
   void sendCommand(int32_t cmd, const T values[], int size) {
     sendCommand(cmd, false);
     // Write out generic values
@@ -66,7 +70,7 @@ public:
     out_stream_->flush();
   }
   
-  template<class T1, class T2>
+  template<typename T1, typename T2>
   void sendCommand(int32_t cmd, T1 value1, T2 value2) {
     sendCommand(cmd, false);
     // Write out generic value1
@@ -84,7 +88,31 @@ public:
     out_stream_->flush();
   }
   
-  template<class T1, class T2>
+  template<typename T1, typename T2, typename T3>
+  void sendCommand(int32_t cmd, T1 value1, T2 value2, T3 value3) {
+    sendCommand(cmd, false);
+    // Write out generic value1
+    serialize<T1>(value1, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param1: '%s'\n",
+              messageTypeNames[cmd], toString<T1>(value1).c_str());
+    }
+    // Write out generic value2
+    serialize<T2>(value2, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param2: '%s'\n",
+              messageTypeNames[cmd], toString<T2>(value2).c_str());
+    }
+    // Write out generic value3
+    serialize<T3>(value3, *out_stream_);
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param3: '%s'\n",
+              messageTypeNames[cmd], toString<T3>(value3).c_str());
+    }
+    out_stream_->flush();
+  }
+  
+  template<typename T1, typename T2>
   void sendCommand(int32_t cmd, T1 value, const T2 values[], int size) {
     sendCommand(cmd, false);
     // Write out generic value
@@ -104,27 +132,6 @@ public:
     out_stream_->flush();
   }
   
-  virtual void sendCommand(int32_t cmd) {
-    sendCommand(cmd, true);
-  }
-  
-  /*
-   virtual void registerCounter(int id, const string& group,
-   const string& name) {
-   serialize<int32_t>(REGISTER_COUNTER, *stream);
-   serialize<int32_t>(id, *stream);
-   serialize<string>(group, *stream);
-   serialize<string>(name, *stream);
-   }
-   
-   virtual void incrementCounter(const TaskContext::Counter* counter,
-   uint64_t amount) {
-   serialize<int32_t>(INCREMENT_COUNTER, *stream);
-   serialize<int32_t>(counter->getId(), *stream);
-   serialize<int64_t>(amount, *stream);
-   }
-   */
-  
   virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
     serialize<int32_t>(INCREMENT_COUNTER, *out_stream_);
     serialize<string>(group, *out_stream_);
@@ -141,19 +148,97 @@ public:
   }
 };
 
+#define NEXT_EVENT_COMMANDS \
+      case START_MESSAGE: {\
+        int32_t protocol_version = deserialize<int32_t>(*in_stream_);\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n",\
+                  protocol_version);\
+        }\
+        handler_->start(protocol_version);\
+        break;\
+      }\
+      case SET_BSPJOB_CONF: {\
+        int32_t entries = deserialize<int32_t>(*in_stream_);\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",\
+                  entries);\
+        }\
+        vector<string> properties(entries*2);\
+        for(int i=0; i < entries*2; ++i) {\
+          string item = deserialize<string>(*in_stream_);\
+          properties.push_back(item);\
+          if(logging) {\
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",\
+                    item.c_str());\
+          }\
+        }\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",\
+                  entries);\
+        }\
+        handler_->setBSPJob(properties);\
+        break;\
+      }\
+      case SET_INPUT_TYPES: {\
+        string key_type = deserialize<string>(*in_stream_);\
+        string value_type = deserialize<string>(*in_stream_);\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",\
+                  key_type.c_str(), value_type.c_str());\
+        }\
+        handler_->setInputTypes(key_type, value_type);\
+        break;\
+      }\
+      case RUN_SETUP: {\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");\
+        }\
+        handler_->runSetup();\
+        break;\
+      }\
+      case RUN_BSP: {\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");\
+        }\
+        handler_->runBsp();\
+        break;\
+      }\
+      case RUN_CLEANUP: {\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");\
+        }\
+        handler_->runCleanup();\
+        break;\
+      }\
+      case CLOSE: {\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");\
+        }\
+        handler_->close();\
+        break;\
+      }\
+      case ABORT: {\
+        if(logging) {\
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");\
+        }\
+        handler_->abort();\
+        break;\
+      }
+
 /********************************************/
 /************** BinaryProtocol **************/
 /********************************************/
-template<class K1, class V1>
-class BinaryProtocol: public Protocol< BinaryProtocol<K1,V1> > {
+template<typename D, typename K1, typename V1>
+class BinaryProtocol: public Protocol< BinaryProtocol<D, K1, V1>, K1, V1 > {
 private:
-  FileInStream* in_stream_;
-  DownwardProtocol<K1,V1>* handler_;
+  HadoopUtils::FileInStream* in_stream_;
+  DownwardProtocol<D, K1, V1>* handler_;
   BinaryUpwardProtocol* uplink_;
   
 public:
-  BinaryProtocol(FILE* in_stream, DownwardProtocol<K1,V1>* handler, FILE* uplink) {
-    in_stream_ = new FileInStream();
+  BinaryProtocol(FILE* in_stream, DownwardProtocol<D, K1, V1>* handler, FILE* uplink) {
+    in_stream_ = new HadoopUtils::FileInStream();
     in_stream_->open(in_stream);
     uplink_ = new BinaryUpwardProtocol(uplink);
     handler_ = handler;
@@ -164,130 +249,85 @@ public:
   }
   
   /**
-   * Wait for next event, but don't expect a response for
-   * a previously sent command
+   * Wait for next event and handle it
    */
   void nextEvent() {
     // read command
-    int32_t cmd;
-    cmd = deserializeInt(*in_stream_);
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
     
     switch (cmd) {
         
-      case START_MESSAGE: {
-        int32_t protocol_version;
-        protocol_version = deserialize<int32_t>(*in_stream_);
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE protocol_version: %d\n",
-                  protocol_version);
-        }
-        handler_->start(protocol_version);
-        break;
-      }
-      // setup BSP Job Configuration
-      case SET_BSPJOB_CONF: {
-        int32_t entries;
-        entries = deserialize<int32_t>(*in_stream_);
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",
-                  entries);
-        }
-        vector<string> properties(entries*2);
-        for(int i=0; i < entries*2; ++i) {
-          string item;
-          item = deserialize<string>(*in_stream_);
-          properties.push_back(item);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",
-                    item.c_str());
-          }
-        }
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",
-                  entries);
-        }
-        handler_->setBSPJob(properties);
-        break;
-      }
-      case SET_INPUT_TYPES: {
-        string key_type;
-        string value_type;
-        key_type = deserialize<string>(*in_stream_);
-        value_type = deserialize<string>(*in_stream_);
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
-                  key_type.c_str(), value_type.c_str());
-        }
-        handler_->setInputTypes(key_type, value_type);
-        break;
-      }
-      case RUN_SETUP: {
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
-        }
-        int32_t piped_input;
-        int32_t piped_output;
-        piped_input = deserialize<int32_t>(*in_stream_);
-        piped_output = deserialize<int32_t>(*in_stream_);
-        handler_->runSetup(piped_input, piped_output);
-        break;
-      }
-      case RUN_BSP: {
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
-        }
-        int32_t piped_input;
-        int32_t piped_output;
-        piped_input = deserialize<int32_t>(*in_stream_);
-        piped_output = deserialize<int32_t>(*in_stream_);
-        handler_->runBsp(piped_input, piped_output);
-        break;
-      }
-      case RUN_CLEANUP: {
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
-        }
-        int32_t piped_input;
-        int32_t piped_output;
-        piped_input = deserialize<int32_t>(*in_stream_);
-        piped_output = deserialize<int32_t>(*in_stream_);
-        handler_->runCleanup(piped_input, piped_output);
-        break;
+        NEXT_EVENT_COMMANDS
+        
+      default: {
+        fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",
+                cmd);
+        HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd));
       }
+    }
+  }
+  
+  /**
+   * Wait for next event and handle it
+   */
+  template<typename K, typename V>
+  void nextEvent() {
+    // read command
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
+    
+    switch (cmd) {
+        
+        NEXT_EVENT_COMMANDS
+        
       case PARTITION_REQUEST: {
         if(logging) {
           fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
         }
         
-        K1 partion_key;
-        V1 partion_value;
-        int32_t num_tasks;
-        
-        partion_key = deserialize<K1>(*in_stream_);
-        partion_value = deserialize<V1>(*in_stream_);
-        num_tasks = deserialize<int32_t>(*in_stream_);
+        K partion_key = deserialize<K>(*in_stream_);
+        V partion_value = deserialize<V>(*in_stream_);
+        int32_t num_tasks = deserialize<int32_t>(*in_stream_);
         
         handler_->runPartition(partion_key, partion_value, num_tasks);
+        
         break;
       }
-      case CLOSE: {
-        if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
-        }
-        handler_->close();
-        break;
+      default: {
+        fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",
+                cmd);
+        HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd));
       }
-      case ABORT: {
+    }
+  }
+  
+  /**
+   * Wait for next event and handle it
+   */
+  template<typename T>
+  void nextEvent() {
+    // read command
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
+    
+    switch (cmd) {
+        
+        NEXT_EVENT_COMMANDS
+        
+      case PARTITION_REQUEST: {
         if(logging) {
-          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
         }
-        handler_->abort();
+        
+        T partion_key_or_value = deserialize<T>(*in_stream_);
+        int32_t num_tasks = deserialize<int32_t>(*in_stream_);
+        
+        handler_->runPartition(partion_key_or_value, num_tasks);
+        
         break;
       }
       default: {
-        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
         fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",
                 cmd);
+        HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::nextEvent: Unknown binary command " + toString(cmd));
       }
     }
   }
@@ -306,11 +346,11 @@ public:
   /**
    * Wait for next event, which should be a response for
    * a previously sent command (expected_response_cmd)
-   * and return the generic result
+   *
+   * Returns a generic result
    */
-  template<class T>
+  template<typename T>
   T getResult(int32_t expected_response_cmd) {
-    
     T result = T();
     
     // read response command
@@ -320,102 +360,50 @@ public:
     if (expected_response_cmd == cmd) {
       
       switch (cmd) {
-          
-        case GET_MSG_COUNT: {
-          T msg_count;
-          msg_count = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msg_count: '%s'\n",
-                    toString<T>(msg_count).c_str());
-          }
-          return msg_count;
-        }
-        case GET_MSG: {
-          T msg;
-          msg = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: '%s'\n",
-                    toString<T>(msg).c_str());
-          }
-          return msg;
-        }
-        case GET_PEERNAME: {
-          T peername;
-          peername = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",
-                    toString<T>(peername).c_str());
-          }
-          return peername;
-        }
-        case GET_PEER_INDEX: {
-          T peer_index = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peer_index: '%s'\n",
-                    toString<T>(peer_index).c_str());
-          }
-          return peer_index;
-        }
-        case GET_PEER_COUNT: {
-          T peer_count = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peer_count: '%s'\n",
-                    toString<T>(peer_count).c_str());
-          }
-          return peer_count;
-        }
-        case GET_SUPERSTEP_COUNT: {
-          T superstep_count = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstep_count: '%s'\n",
-                    toString<T>(superstep_count).c_str());
-          }
-          return superstep_count;
-        }
-          
-        case SEQFILE_OPEN: {
-          T file_id = deserialize<T>(*in_stream_);
-          if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN file_id: '%s'\n",
-                    toString<T>(file_id).c_str());
-          }
-          return file_id;
-        }
-        case SEQFILE_APPEND: {
+        case GET_MSG_COUNT:
+        case GET_MSG:
+        case GET_PEERNAME:
+        case GET_PEER_INDEX:
+        case GET_PEER_COUNT:
+        case GET_SUPERSTEP_COUNT:
+        case SEQFILE_OPEN:
+        case SEQFILE_APPEND:
+        case SEQFILE_CLOSE: {
           result = deserialize<T>(*in_stream_);
           if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: '%s'\n",
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got '%s' result: '%s'\n",
+                    messageTypeNames[cmd],
                     toString<T>(result).c_str());
           }
           return result;
         }
-        case SEQFILE_CLOSE: {
-          result = deserialize<T>(*in_stream_);
+      }
+    } else { // Not expected response
+      
+      switch (cmd) {
+        case END_OF_DATA: {
           if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: '%s'\n",
-                    toString<T>(result).c_str());
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got END_OF_DATA\n");
           }
           return result;
         }
       }
-      // Not expected response
-    } else {
-      
+      // TODO
       /*
        case CLOSE: {
-       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got CLOSE\n");
        handler_->close();
        break;
        }
        case ABORT: {
-       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+       if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::getResult - got ABORT\n");
        handler_->abort();
        break;
        }
        */
-      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
-      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+      fprintf(stderr,"HamaPipes::BinaryProtocol::getResult(expected_response_cmd=%d) - Unknown binary command: %d\n",
               expected_response_cmd, cmd);
+      HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getResult: Unknown binary command " + toString(cmd));
     }
     return result;
   }
@@ -423,11 +411,11 @@ public:
   /**
    * Wait for next event, which should be a response for
    * a previously sent command (expected_response_cmd)
-   * and return the generic vector result list
+   *
+   * Returns the generic vector result list
    */
-  template<class T>
+  template<typename T>
   vector<T> getVectorResult(int32_t expected_response_cmd) {
-    
     vector<T> results;
     
     // read response command
@@ -442,14 +430,14 @@ public:
           T peername;
           int32_t peername_count = deserialize<int32_t>(*in_stream_);
           if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername_count: %d\n",
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult - got GET_ALL_PEERNAME peername_count: %d\n",
                     peername_count);
           }
           for (int i=0; i<peername_count; i++)  {
             peername = deserialize<T>(*in_stream_);
             peernames.push_back(peername);
             if(logging) {
-              fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: '%s'\n",
+              fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult - got GET_ALL_PEERNAME peername: '%s'\n",
                       toString<T>(peername).c_str());
             }
           }
@@ -457,9 +445,9 @@ public:
         }
       }
     } else {
-      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
-      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(%d) - Unknown binary command: %d\n",
+      fprintf(stderr,"HamaPipes::BinaryProtocol::getVectorResult(%d) - Unknown binary command: %d\n",
               expected_response_cmd, cmd);
+      HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getVectorResult: Unknown binary command " + toString(cmd));
     }
     return results;
   }
@@ -467,12 +455,12 @@ public:
   /**
    * Wait for next event, which should be a response for
    * a previously sent command (expected_response_cmd)
-   * and return the generic KeyValuePair or an empty one
+   *
+   * Returns the generic KeyValuePair or an empty one
    * if no data is available
    */
-  template <class K, class V>
+  template <typename K, typename V>
   KeyValuePair<K,V> getKeyValueResult(int32_t expected_response_cmd) {
-    
     KeyValuePair<K,V> key_value_pair;
     
     // read response command
@@ -483,14 +471,16 @@ public:
       
       switch (cmd) {
           
-        case READ_KEYVALUE: {
+        case READ_KEYVALUE:
+        case SEQFILE_READNEXT: {
           K key = deserialize<K>(*in_stream_);
           V value = deserialize<V>(*in_stream_);
           
           if(logging) {
             string k = toString<K>(key);
             string v = toString<V>(value);
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: '%s' value: '%s'\n",
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got '%s' key: '%s' value: '%s'\n",
+                    messageTypeNames[cmd],
                     ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
                     ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
           }
@@ -498,34 +488,70 @@ public:
           key_value_pair = pair<K,V>(key, value);
           return key_value_pair;
         }
+        case END_OF_DATA: {
+          key_value_pair = KeyValuePair<K,V>(true);
+          if(logging) {
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got END_OF_DATA\n");
+          }
+        }
+          
+      }
+    } else {
+      key_value_pair = KeyValuePair<K,V>(true);
+      fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult(expected_cmd = %d) - Unknown binary command: %d\n",
+              expected_response_cmd, cmd);
+      fprintf(stderr,"Error: Please verfiy serialization! The key or value type could possibly not be deserialized!\n");
+      HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getKeyValueResult: Unknown binary command " + toString(cmd));
+    }
+    return key_value_pair;
+  }
+  
+  /**
+   * Wait for next event, which should be a response for
+   * a previously sent command (expected_response_cmd)
+   *
+   * Returns the KeyValuePair with one value only
+   * or an empty one if no data is available
+   */
+  template <typename T>
+  KeyValuePair<T,T> getKeyValueResult(int32_t expected_response_cmd) {
+    KeyValuePair<T,T> key_value_pair;
+    
+    // read response command
+    int32_t cmd = deserialize<int32_t>(*in_stream_);
+    
+    // check if response is expected or END_OF_DATA
+    if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
+      
+      switch (cmd) {
+          
+        case READ_KEYVALUE:
         case SEQFILE_READNEXT: {
-          K key = deserialize<K>(*in_stream_);
-          V value = deserialize<V>(*in_stream_);
+          T value = deserialize<T>(*in_stream_);
           
           if(logging) {
-            string k = toString<K>(key);
-            string v = toString<V>(value);
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: '%s' value: '%s'\n",
-                    ((k.length()<10)?k.c_str():k.substr(0,9).append("...").c_str()),
+            string v = toString<T>(value);
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got '%s' value: '%s'\n",
+                    messageTypeNames[cmd],
                     ((v.length()<10)?v.c_str():v.substr(0,9).append("...").c_str()) );
           }
           
-          key_value_pair = pair<K,V>(key, value);
+          key_value_pair = pair<T,T>(value, value);
           return key_value_pair;
         }
         case END_OF_DATA: {
-          key_value_pair = KeyValuePair<K,V>(true);
+          key_value_pair = KeyValuePair<T,T>(true);
           if(logging) {
-            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got END_OF_DATA\n");
+            fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult - got END_OF_DATA\n");
           }
         }
       }
     } else {
-      key_value_pair = KeyValuePair<K,V>(true);
-      fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent(expected_cmd = %d) - Unknown binary command: %d\n",
+      key_value_pair = KeyValuePair<T,T>(true);
+      fprintf(stderr,"HamaPipes::BinaryProtocol::getKeyValueResult(expected_cmd = %d) - Unknown binary command: %d\n",
               expected_response_cmd, cmd);
-      fprintf(stderr,"ERORR: Please verfiy serialization! The key or value type could possibly not be deserialized!\n");
-      HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+      fprintf(stderr,"Error: Please verfiy serialization! The key or value type could possibly not be deserialized!\n");
+      HADOOP_ASSERT(false, "HamaPipes::BinaryProtocol::getKeyValueResult: Unknown binary command " + toString(cmd));
     }
     return key_value_pair;
   }
@@ -540,16 +566,14 @@ public:
 /********************************************/
 /************** BSPContextImpl **************/
 /********************************************/
-template<class K1, class V1, class K2, class V2, class M>
-class BSPContextImpl: public BSPContext<K1, V1, K2, V2, M>, public DownwardProtocol<K1, V1> {
+template<typename K1, typename V1, typename K2, typename V2, typename M>
+class BSPContextImpl: public BSPContext<K1, V1, K2, V2, M>, public DownwardProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1> {
 private:
   const Factory<K1, V1, K2, V2, M>* factory_;
   BSPJob* job_;
   BSP<K1, V1, K2, V2, M>* bsp_;
-  Partitioner<K1, V1, K2, V2, M>* partitioner_;
-  RecordReader<K1, V1>* reader_;
-  RecordWriter<K2, V2>* writer_;
-  Protocol< BinaryProtocol<K1,V1> >* protocol_;
+  Partitioner<K1, V1>* partitioner_;
+  Protocol< BinaryProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>, K1, V1 >* protocol_;
   UpwardProtocol<BinaryUpwardProtocol>* uplink_;
   
   bool done_;
@@ -563,13 +587,10 @@ private:
 public:
   
   BSPContextImpl(const Factory<K1, V1, K2, V2, M>& factory) {
-    
     factory_ = &factory;
     job_ = NULL;
     bsp_ = NULL;
     partitioner_ = NULL;
-    reader_ = NULL;
-    writer_ = NULL;
     protocol_ = NULL;
     uplink_ = NULL;
     
@@ -586,8 +607,8 @@ public:
   /********************************************/
   virtual void start(int protocol_version) {
     if (protocol_version != 0) {
-      throw Error("Protocol version " + toString(protocol_version) +
-                  " not supported");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::start Protocol version " + toString(protocol_version) +
+                               " not supported");
     }
     partitioner_ = factory_->createPartitioner(*this);
   }
@@ -595,7 +616,7 @@ public:
   virtual void setBSPJob(vector<string> values) {
     int len = values.size();
     BSPJobImpl* result = new BSPJobImpl();
-    HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
+    HADOOP_ASSERT(len % 2 == 0, "HamaPipes::BSPContextImpl::setBSPJob Odd length of job conf values");
     for(int i=0; i < len; i += 2) {
       result->set(values[i], values[i+1]);
     }
@@ -606,36 +627,7 @@ public:
     inputClass_ = pair<string,string>(key_type, value_type);
   }
   
-  /* local method */
-  void setupReaderWriter(bool piped_input, bool piped_output) {
-    
-    if(logging) {
-      fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
-              (piped_input)?"true":"false", (piped_output)?"true":"false");
-    }
-    
-    if (piped_input && reader_==NULL) {
-      reader_ = factory_->createRecordReader(*this);
-      HADOOP_ASSERT((reader_ == NULL) == piped_input,
-                    piped_input ? "RecordReader defined when not needed.":
-                    "RecordReader not defined");
-      
-      //if (reader != NULL) {
-      //  value = new string();
-      //}
-    }
-    
-    if (piped_output && writer_==NULL) {
-      writer_ = factory_->createRecordWriter(*this);
-      HADOOP_ASSERT((writer_ == NULL) == piped_output,
-                    piped_output ? "RecordWriter defined when not needed.":
-                    "RecordWriter not defined");
-    }
-  }
-  
-  virtual void runSetup(bool piped_input, bool piped_output) {
-    setupReaderWriter(piped_input, piped_output);
-    
+  virtual void runSetup() {
     if (bsp_ == NULL) {
       bsp_ = factory_->createBSP(*this);
     }
@@ -648,9 +640,7 @@ public:
     }
   }
   
-  virtual void runBsp(bool piped_input, bool piped_output) {
-    setupReaderWriter(piped_input, piped_output);
-    
+  virtual void runBsp() {
     if (bsp_ == NULL) {
       bsp_ = factory_->createBSP(*this);
     }
@@ -663,9 +653,7 @@ public:
     }
   }
   
-  virtual void runCleanup(bool piped_input, bool piped_output) {
-    setupReaderWriter(piped_input, piped_output);
-    
+  virtual void runCleanup() {
     if (bsp_ != NULL) {
       has_task_ = true;
       bsp_->cleanup(*this);
@@ -674,20 +662,6 @@ public:
     }
   }
   
-  /********************************************/
-  /*******       Partitioner            *******/
-  /********************************************/
-  virtual void runPartition(const K1& key, const V1& value, int32_t num_tasks) {
-    if (partitioner_ != NULL) {
-      int part = partitioner_->partition(key, value, num_tasks);
-      uplink_->sendCommand<int32_t>(PARTITION_RESPONSE, part);
-    } else {
-      if(logging) {
-        fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
-      }
-    }
-  }
-  
   virtual void close() {
     pthread_mutex_lock(&mutex_done_);
     done_ = true;
@@ -701,7 +675,34 @@ public:
   }
   
   virtual void abort() {
-    throw Error("Aborted by driver");
+    throw HadoopUtils::Error("HamaPipes::BSPContextImpl::abort Aborted by driver");
+  }
+  
+  /********************************************/
+  /***** DownwardProtocolPartition IMPL *******/
+  /********************************************/
+  template<typename K, typename V>
+  void runPartition(const K& key, const V& value, int32_t num_tasks) {
+    if (partitioner_ != NULL) {
+      int part = partitioner_->partition(key, value, num_tasks);
+      uplink_->sendCommand<int32_t>(PARTITION_RESPONSE, part);
+    } else {
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+      }
+    }
+  }
+  
+  template<typename T>
+  void runPartition(const T& key_or_value, int32_t num_tasks) {
+    if (partitioner_ != NULL) {
+      int part = partitioner_->partition(key_or_value, num_tasks);
+      uplink_->sendCommand<int32_t>(PARTITION_RESPONSE, part);
+    } else {
+      if(logging) {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+      }
+    }
   }
   
   /********************************************/
@@ -716,32 +717,16 @@ public:
   }
   
   /**
-   * Get the current key.
-   * @return the current key or NULL if called before the first map or reduce
-   */
-  //virtual const string& getInputKey() {
-  //  return key;
-  //}
-  
-  /**
-   * Get the current value.
-   * @return the current value or NULL if called before the first map or
-   *    reduce
-   */
-  //virtual const string& getInputValue() {
-  //  return *value;
-  //}
-  
-  /**
    * Register a counter with the given group and name.
    */
   virtual long getCounter(const string& group, const string& name) {
-   // TODO
-   // int id = registeredCounterIds.size();
-   // registeredCounterIds.push_back(id);
-   // uplink->registerCounter(id, group, name);
-   // return new Counter(id);
-   return 0;
+    // TODO
+    
+    // int id = registeredCounterIds.size();
+    // registeredCounterIds.push_back(id);
+    // uplink->registerCounter(id, group, name);
+    // return new Counter(id);
+    return 0;
   }
   
   /**
@@ -754,7 +739,7 @@ public:
     // Verify response command
     bool response = protocol_->verifyResult(INCREMENT_COUNTER);
     if (response == false) {
-      throw Error("incrementCounter received wrong response!");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::incrementCounter received wrong response!");
     }
   }
   
@@ -784,37 +769,7 @@ public:
   }
   
   /**
-   * Send a data with a tag to another BSPSlave corresponding to hostname.
-   * Messages sent by this method are not guaranteed to be received in a sent
-   * order.
-   */
-  virtual void sendMessage(const string& peer_name, const M& msg) {
-    uplink_->sendCommand<string,M>(SEND_MSG, peer_name, msg);
-    
-    // Verify response command
-    bool response = protocol_->verifyResult(SEND_MSG);
-    if (response == false) {
-      throw Error("sendMessage received wrong response!");
-    }
-  }
-  
-  /**
-   * @return A message from the peer's received messages queue (a FIFO).
-   */
-  virtual M getCurrentMessage() {
-    uplink_->sendCommand(GET_MSG);
-    
-    M message = protocol_->template getResult<M>(GET_MSG);
-    
-    if(logging) {
-      fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n",
-              toString<M>(message).c_str());
-    }
-    return message;
-  }
-  
-  /**
-   * @return The number of messages in the peer's received messages queue.
+   * Returns the number of messages in the peer's received messages queue.
    */
   virtual int getNumCurrentMessages() {
     uplink_->sendCommand(GET_MSG_COUNT);
@@ -840,12 +795,12 @@ public:
     // Verify response command
     bool response = protocol_->verifyResult(SYNC);
     if (response == false) {
-      throw Error("sync received wrong response!");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::sync received wrong response!");
     }
   }
   
   /**
-   * @return the name of this peer in the format "hostname:port".
+   * Returns the name of this peer in the format "hostname:port".
    */
   virtual string getPeerName() {
     // submit id=-1 to receive own peername
@@ -861,7 +816,7 @@ public:
   }
   
   /**
-   * @return the name of n-th peer from sorted array by name.
+   * Returns the name of n-th peer from sorted array by name.
    */
   virtual string getPeerName(int index) {
     uplink_->sendCommand<int32_t>(GET_PEERNAME, index);
@@ -872,24 +827,21 @@ public:
       fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - result: %s\n",
               result.c_str());
     }
-    
     return result;
   }
   
   /**
-   * @return the names of all the peers executing tasks from the same job
+   * Returns the names of all the peers executing tasks from the same job
    *         (including this peer).
    */
   virtual vector<string> getAllPeerNames() {
     uplink_->sendCommand(GET_ALL_PEERNAME);
     
-    vector<string> results = protocol_->template getVectorResult<string>(GET_ALL_PEERNAME);
-    
-    return results;
+    return protocol_->template getVectorResult<string>(GET_ALL_PEERNAME);
   }
   
   /**
-   * @return the index of this peer from sorted array by name.
+   * Returns the index of this peer from sorted array by name.
    */
   virtual int getPeerIndex() {
     uplink_->sendCommand(GET_PEER_INDEX);
@@ -904,7 +856,7 @@ public:
   }
   
   /**
-   * @return the number of peers
+   * Returns the number of peers
    */
   virtual int getNumPeers() {
     uplink_->sendCommand(GET_PEER_COUNT);
@@ -919,7 +871,7 @@ public:
   }
   
   /**
-   * @return the count of current super-step
+   * Returns the count of current super-step
    */
   virtual long getSuperstepCount() {
     uplink_->sendCommand(GET_SUPERSTEP_COUNT);
@@ -942,36 +894,35 @@ public:
     // Verify response command
     bool response = protocol_->verifyResult(CLEAR);
     if (response == false) {
-      throw Error("clear received wrong response!");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::clear received wrong response!");
     }
   }
   
   /**
-   * Writes a key/value pair to the output collector
+   * Closes the input and opens it right away, so that the file pointer is at
+   * the beginning again.
    */
-  virtual void write(const K2& key, const V2& value) {
-    if (writer_ != NULL) {
-      writer_->emit(key, value); // TODO writer not implemented
-    } else {
-      uplink_->sendCommand<K2,V2>(WRITE_KEYVALUE, key, value);
-    }
-
+  virtual void reopenInput() {
+    uplink_->sendCommand(REOPEN_INPUT);
+    
     // Verify response command
-    bool response = protocol_->verifyResult(WRITE_KEYVALUE);
+    bool response = protocol_->verifyResult(REOPEN_INPUT);
     if (response == false) {
-      throw Error("write received wrong response!");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::reopenInput received wrong response!");
     }
   }
   
+  /********************************************/
+  /*******         Reader IMPL          *******/
+  /********************************************/
   /**
-   * Deserializes the next input key value into the given objects;
+   * Deserializes the next input key value into the given objects
    */
-  virtual bool readNext(K1& key, V1& value) {
-    
+  template<typename K, typename V>
+  bool readNext(K& key, V& value) {
     uplink_->sendCommand(READ_KEYVALUE);
     
-    KeyValuePair<K1,V1> key_value_pair;
-    key_value_pair = protocol_->template getKeyValueResult<K1,V1>(READ_KEYVALUE);
+    KeyValuePair<K,V> key_value_pair = protocol_->template getKeyValueResult<K,V>(READ_KEYVALUE);
     
     if (!key_value_pair.is_empty) {
       key = key_value_pair.first;
@@ -986,19 +937,90 @@ public:
   }
   
   /**
-   * Closes the input and opens it right away, so that the file pointer is at
-   * the beginning again.
+   * Deserializes the next input key OR value into the given object
    */
-  virtual void reopenInput() {
-    uplink_->sendCommand(REOPEN_INPUT);
+  template<typename T>
+  bool readNext(T& key_or_value) {
+    uplink_->sendCommand(READ_KEYVALUE);
+    
+    KeyValuePair<T,T> key_value_pair = protocol_->template getKeyValueResult<T>(READ_KEYVALUE);
+    
+    if (!key_value_pair.is_empty) {
+      key_or_value = key_value_pair.first;
+    }
+    
+    if (logging && key_value_pair.is_empty) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+    }
+    
+    return (!key_value_pair.is_empty);
+  }
+  
+  /********************************************/
+  /*******         Writer IMPL          *******/
+  /********************************************/
+  /**
+   * Writes a key/value pair to the output collector
+   */
+  template<typename K, typename V>
+  void write(const K& key, const V& value) {
+    uplink_->sendCommand<K,V>(WRITE_KEYVALUE, key, value);
     
     // Verify response command
-    bool response = protocol_->verifyResult(REOPEN_INPUT);
+    bool response = protocol_->verifyResult(WRITE_KEYVALUE);
     if (response == false) {
-      throw Error("reopenInput received wrong response!");
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::write received wrong response!");
     }
   }
   
+  /**
+   * Write key OR value to the output collector
+   */
+  template<typename T>
+  void write(const T& key_or_value) {
+    uplink_->sendCommand<T>(WRITE_KEYVALUE, key_or_value);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(WRITE_KEYVALUE);
+    if (response == false) {
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::write received wrong response!");
+    }
+  }
+  
+  /********************************************/
+  /*******       Messenger IMPL         *******/
+  /********************************************/
+  /**
+   * Send a data with a tag to another BSPSlave corresponding to hostname.
+   * Messages sent by this method are not guaranteed to be received in a sent
+   * order.
+   */
+  template<typename T>
+  void sendMessage(const string& peer_name, const T& msg) {
+    uplink_->sendCommand<string,T>(SEND_MSG, peer_name, msg);
+    
+    // Verify response command
+    bool response = protocol_->verifyResult(SEND_MSG);
+    if (response == false) {
+      throw HadoopUtils::Error("HamaPipes::BSPContextImpl::sendMessage received wrong response!");
+    }
+  }
+  
+  /**
+   * Returns a message from the peer's received messages queue (a FIFO).
+   */
+  template<typename T>
+  T getCurrentMessage() {
+    uplink_->sendCommand(GET_MSG);
+    
+    M message = protocol_->template getResult<T>(GET_MSG);
+    
+    if(logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - result: %s\n",
+              toString<T>(message).c_str());
+    }
+    return message;
+  }
   
   /********************************************/
   /*******  SequenceFileConnector IMPL  *******/
@@ -1006,7 +1028,8 @@ public:
   
   /**
    * Open SequenceFile with opion "r" or "w"
-   * @return the corresponding fileID
+   *
+   * Returns the corresponding fileID
    */
   virtual int32_t sequenceFileOpen(const string& path, const string& option,
                                    const string& key_type, const string& value_type) {
@@ -1056,15 +1079,13 @@ public:
    * Read next key/value pair from the SequenceFile with fileID
    * Using Curiously recurring template pattern(CTRP)
    */
-  template<class K, class V>
+  template<typename K, typename V>
   bool sequenceFileReadNext(int32_t file_id, K& key, V& value) {
-    
     // send request
     uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
     
     // get response
-    KeyValuePair<K,V> key_value_pair;
-    key_value_pair = protocol_->template getKeyValueResult<K,V>(SEQFILE_READNEXT);
+    KeyValuePair<K,V> key_value_pair = protocol_->template getKeyValueResult<K,V>(SEQFILE_READNEXT);
     
     if (!key_value_pair.is_empty) {
       key = key_value_pair.first;
@@ -1072,7 +1093,28 @@ public:
     }
     
     if (logging && key_value_pair.is_empty) {
-      fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - END_OF_DATA\n");
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - END_OF_DATA\n");
+    }
+    
+    return (!key_value_pair.is_empty);
+  }
+  
+  /**
+   * Read next key/value pair from the SequenceFile with fileID
+   * key OR value type is NullWritable
+   */
+  template<typename T>
+  bool sequenceFileReadNext(int32_t file_id, T& key_or_value) {
+    uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
+    
+    KeyValuePair<T,T> key_value_pair = protocol_->template getKeyValueResult<T>(SEQFILE_READNEXT);
+    
+    if (!key_value_pair.is_empty) {
+      key_or_value = key_value_pair.first;
+    }
+    
+    if (logging && key_value_pair.is_empty) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - END_OF_DATA\n");
     }
     
     return (!key_value_pair.is_empty);
@@ -1082,10 +1124,28 @@ public:
    * Append the next key/value pair to the SequenceFile with fileID
    * Using Curiously recurring template pattern(CTRP)
    */
-  template<class K, class V>
+  template<typename K, typename V>
   bool sequenceFileAppend(int32_t file_id, const K& key, const V& value) {
-    string values[] = {key, value};
-    uplink_->sendCommand<int32_t,string>(SEQFILE_APPEND, file_id, values, 2);
+    uplink_->sendCommand<int32_t,K,V>(SEQFILE_APPEND, file_id, key, value);
+    
+    int result = protocol_->template getResult<int32_t>(SEQFILE_APPEND);
+    
+    if (logging && result==0) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Nothing appended!\n");
+    } else if (logging) {
+      fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileAppend - Successfully appended!\n");
+    }
+    
+    return (result==1);
+  }
+  
+  /**
+   * Append the next key/value pair to the SequenceFile with fileID
+   * key OR value type is NullWritable
+   */
+  template<typename T>
+  bool sequenceFileAppend(int32_t file_id, const T& key_or_value) {
+    uplink_->sendCommand<int32_t,T>(SEQFILE_APPEND, file_id, key_or_value);
     
     int result = protocol_->template getResult<int32_t>(SEQFILE_APPEND);
     
@@ -1102,7 +1162,7 @@ public:
   /*************** Other STUFF  ***************/
   /********************************************/
   
-  void setProtocol(Protocol< BinaryProtocol<K1,V1> >* protocol, UpwardProtocol<BinaryUpwardProtocol>* uplink) {
+  void setProtocol(Protocol< BinaryProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>, K1, V1 >* protocol, UpwardProtocol<BinaryUpwardProtocol>* uplink) {
     protocol_ = protocol;
     uplink_ = uplink;
   }
@@ -1114,20 +1174,6 @@ public:
     return done_copy;
   }
   
-  /**
-   * Advance to the next value.
-   */
-  /*
-   bool nextValue() {
-   if (isNewKey || done) {
-   return false;
-   }
-   isNewValue = false;
-   //progress();
-   protocol->nextEvent();
-   return isNewValue;
-   }
-   */
   void waitForTask() {
     while (!done_ && !has_task_) {
       if(logging) {
@@ -1137,45 +1183,11 @@ public:
       protocol_->nextEvent();
     }
   }
-  /*
-   bool nextKey() {
-   if (reader == NULL) {
-   while (!isNewKey) {
-   nextValue();
-   if (done) {
-   return false;
-   }
-   }
-   key = *newKey;
-   } else {
-   if (!reader->next(key, const_cast<string&>(*value))) {
-   pthread_mutex_lock(&mutexDone);
-   done = true;
-   pthread_mutex_unlock(&mutexDone);
-   return false;
-   }
-   //progressFloat = reader->getProgress();
-   }
-   isNewKey = false;
-   
-   if (bsp != NULL) {
-   bsp->bsp(*this);
-   }
-   return true;
-   }
-   */
+  
   void closeAll() {
-    if (reader_) {
-      reader_->close();
-    }
-    
     if (bsp_) {
       bsp_->close();
     }
-    
-    if (writer_) {
-      writer_->close();
-    }
   }
   
   virtual ~BSPContextImpl() {
@@ -1183,8 +1195,6 @@ public:
     delete job_;
     delete bsp_;
     delete partitioner_;
-    delete reader_;
-    delete writer_;
     delete protocol_;
     delete uplink_;
     //delete inputSplit_;
@@ -1195,7 +1205,7 @@ public:
 /**
  * Ping the parent every 5 seconds to know if it is alive
  */
-template<class K1, class V1, class K2, class V2, class M>
+template<typename K1, typename V1, typename K2, typename V2, typename M>
 void* ping(void* ptr) {
   BSPContextImpl<K1, V1, K2, V2, M>* context = (BSPContextImpl<K1, V1, K2, V2, M>*) ptr;
   char* portStr = getenv("hama.pipes.command.port");
@@ -1211,7 +1221,7 @@ void* ping(void* ptr) {
                       string("problem creating socket: ") + strerror(errno));
         sockaddr_in addr;
         addr.sin_family = AF_INET;
-        addr.sin_port = htons(toInt(portStr));
+        addr.sin_port = htons(HadoopUtils::toInt(portStr));
         addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
         if(logging) {
           fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n",
@@ -1229,7 +1239,7 @@ void* ping(void* ptr) {
         HADOOP_ASSERT(result == 0, "problem closing socket");
       }
       remaining_retries = MAX_RETRIES;
-    } catch (Error& err) {
+    } catch (HadoopUtils::Error& err) {
       if (!context->isDone()) {
         fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
                 err.getMessage().c_str());
@@ -1249,21 +1259,22 @@ void* ping(void* ptr) {
  * Run the assigned task in the framework.
  * The user's main function should set the various functions using the
  * set* functions above and then call this.
- * @return true, if the task succeeded.
+ *
+ * Returns true, if the task succeeded.
  */
-template<class K1, class V1, class K2, class V2, class M>
+template<typename K1, typename V1, typename K2, typename V2, typename M>
 bool runTask(const Factory<K1, V1, K2, V2, M>& factory) {
   try {
     HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL, "No environment found!");
     
-    logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+    logging = (HadoopUtils::toInt(getenv("hama.pipes.logging"))==0)?false:true;
     if (logging) {
       fprintf(stderr,"HamaPipes::runTask - logging is: %s\n",
               ((logging)?"true":"false"));
     }
     
     BSPContextImpl<K1, V1, K2, V2, M>* context = new BSPContextImpl<K1, V1, K2, V2, M>(factory);
-    Protocol< BinaryProtocol<K1,V1> >* protocol;
+    Protocol< BinaryProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>, K1, V1 >* protocol;
     
     char* port_str = getenv("hama.pipes.command.port");
     int sock = -1;
@@ -1277,7 +1288,7 @@ bool runTask(const Factory<K1, V1, K2, V
                     string("problem creating socket: ") + strerror(errno));
       sockaddr_in addr;
       addr.sin_family = AF_INET;
-      addr.sin_port = htons(toInt(port_str));
+      addr.sin_port = htons(HadoopUtils::toInt(port_str));
       addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
       HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                     string("problem connecting command socket: ") +
@@ -1298,7 +1309,7 @@ bool runTask(const Factory<K1, V1, K2, V
       HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for out_stream: ")
                     + strerror(errno));
       
-      protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+      protocol = new BinaryProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>(in_stream, context, out_stream);
       if(logging) {
         fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n",
                 port_str);
@@ -1310,7 +1321,7 @@ bool runTask(const Factory<K1, V1, K2, V
       out_filename += ".out";
       in_stream = fopen(filename, "r");
       out_stream = fopen(out_filename.c_str(), "w");
-      protocol = new BinaryProtocol<K1,V1>(in_stream, context, out_stream);
+      protocol = new BinaryProtocol<BSPContextImpl<K1, V1, K2, V2, M>, K1, V1>(in_stream, context, out_stream);
     } else {
       //protocol = new TextProtocol(stdin, context, stdout);
       fprintf(stderr,"HamaPipes::runTask - Protocol couldn't be initialized!\n");
@@ -1357,7 +1368,7 @@ bool runTask(const Factory<K1, V1, K2, V
     
     return true;
     
-  } catch (Error& err) {
+  } catch (HadoopUtils::Error& err) {
     fprintf(stderr, "Hama Pipes Exception: %s\n",
             err.getMessage().c_str());
     return false;



Mime
View raw message