hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1527466 [2/2] - in /hama/trunk: ./ c++/src/ c++/src/main/native/examples/ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/examples/input/ c++/src/main/native/pipes/impl/ c++/src/main/native/utils/i...
Date Mon, 30 Sep 2013 07:26:16 GMT
Modified: hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/HamaPipes.cc?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc Mon Sep 30 07:26:15 2013
@@ -50,11 +50,11 @@ using std::endl;
 using namespace HadoopUtils;
 
 namespace HamaPipes {
-
+  
   bool logging;
   
   /********************************************/
-  /****************** BSPJob ******************/  
+  /****************** BSPJob ******************/
   /********************************************/
   class BSPJobImpl: public BSPJob {
   private:
@@ -63,37 +63,37 @@ namespace HamaPipes {
     void set(const string& key, const string& value) {
       values[key] = value;
     }
-
+    
     virtual bool hasKey(const string& key) const {
       return values.find(key) != values.end();
     }
-
+    
     virtual const string& get(const string& key) const {
       map<string,string>::const_iterator itr = values.find(key);
       if (itr == values.end()) {
         throw Error("Key " + key + " not found in BSPJob");
-      }        
+      }
       return itr->second;
     }
-
+    
     virtual int getInt(const string& key) const {
       const string& val = get(key);
       return toInt(val);
     }
-
+    
     virtual float getFloat(const string& key) const {
       const string& val = get(key);
       return toFloat(val);
     }
-
+    
     virtual bool getBoolean(const string&key) const {
       const string& val = get(key);
       return toBool(val);
     }
   };
-    
+  
   /********************************************/
-  /************* DownwardProtocol *************/  
+  /************* DownwardProtocol *************/
   /********************************************/
   class DownwardProtocol {
   public:
@@ -101,26 +101,26 @@ namespace HamaPipes {
     virtual void setBSPJob(vector<string> values) = 0;
     virtual void setInputTypes(string keyType, string valueType) = 0;
     virtual void setKeyValue(const string& _key, const string& _value) = 0;
-      
+    
     virtual void runBsp(bool pipedInput, bool pipedOutput) = 0;
     virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0;
     virtual void runSetup(bool pipedInput, bool pipedOutput) = 0;
-    virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;  
-      
+    virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;
+    
     virtual void setNewResult(int32_t value) = 0;
-    virtual void setNewResult(int64_t value) = 0;  
+    virtual void setNewResult(int64_t value) = 0;
     virtual void setNewResult(const string&  value) = 0;
     virtual void setNewResult(vector<string> value) = 0;
-      
+    
     //virtual void reduceKey(const string& key) = 0;
     //virtual void reduceValue(const string& value) = 0;
     virtual void close() = 0;
     virtual void abort() = 0;
     virtual ~DownwardProtocol() {}
   };
-
+  
   /********************************************/
-  /************** UpwardProtocol **************/  
+  /************** UpwardProtocol **************/
   /********************************************/
   class UpwardProtocol {
   public:
@@ -129,15 +129,15 @@ namespace HamaPipes {
     virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0;
     virtual void sendCMD(int32_t cmd, const string& value) = 0;
     virtual void sendCMD(int32_t cmd, const string values[], int size) = 0;
-      
+    
     //virtual void registerCounter(int id, const string& group, const string& name) = 0;
     //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
     virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
     virtual ~UpwardProtocol() {}
   };
-    
+  
   /********************************************/
-  /***************** Protocol *****************/  
+  /***************** Protocol *****************/
   /********************************************/
   class Protocol {
   public:
@@ -145,47 +145,47 @@ namespace HamaPipes {
     virtual UpwardProtocol* getUplink() = 0;
     virtual ~Protocol(){}
   };
-    
+  
   /********************************************/
-  /*************** MESSAGE_TYPE ***************/  
+  /*************** MESSAGE_TYPE ***************/
   /********************************************/
   enum MESSAGE_TYPE {
-      START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,       
-      RUN_SETUP, RUN_BSP, RUN_CLEANUP,
-      READ_KEYVALUE, WRITE_KEYVALUE, 
-      GET_MSG, GET_MSG_COUNT, 
-      SEND_MSG, SYNC, 
-      GET_ALL_PEERNAME, GET_PEERNAME,
-      GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
-      REOPEN_INPUT, CLEAR,
-      CLOSE, ABORT,
-      DONE, TASK_DONE, 
-      REGISTER_COUNTER, INCREMENT_COUNTER,
-      SEQFILE_OPEN, SEQFILE_READNEXT, 
-      SEQFILE_APPEND, SEQFILE_CLOSE,
-      PARTITION_REQUEST, PARTITION_RESPONSE
+    START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,
+    RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+    READ_KEYVALUE, WRITE_KEYVALUE,
+    GET_MSG, GET_MSG_COUNT,
+    SEND_MSG, SYNC,
+    GET_ALL_PEERNAME, GET_PEERNAME,
+    GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+    REOPEN_INPUT, CLEAR,
+    CLOSE, ABORT,
+    DONE, TASK_DONE,
+    REGISTER_COUNTER, INCREMENT_COUNTER,
+    SEQFILE_OPEN, SEQFILE_READNEXT,
+    SEQFILE_APPEND, SEQFILE_CLOSE,
+    PARTITION_REQUEST, PARTITION_RESPONSE
   };
-    
+  
   /* Only needed for debugging output */
   const char* messageTypeNames[] = {
-      stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),       
-      stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
-      stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ), 
-      stringify( GET_MSG ), stringify( GET_MSG_COUNT ), 
-      stringify( SEND_MSG ), stringify( SYNC ), 
-      stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
-      stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
-      stringify( REOPEN_INPUT ), stringify( CLEAR ),
-      stringify( CLOSE ), stringify( ABORT ),
-      stringify( DONE ), stringify( TASK_DONE ), 
-      stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
-      stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
-      stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
-      stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
-    };
-
+    stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),
+    stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+    stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ),
+    stringify( GET_MSG ), stringify( GET_MSG_COUNT ),
+    stringify( SEND_MSG ), stringify( SYNC ),
+    stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+    stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+    stringify( REOPEN_INPUT ), stringify( CLEAR ),
+    stringify( CLOSE ), stringify( ABORT ),
+    stringify( DONE ), stringify( TASK_DONE ),
+    stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+    stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+    stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+    stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
+  };
+  
   /********************************************/
-  /*********** BinaryUpwardProtocol ***********/  
+  /*********** BinaryUpwardProtocol ***********/
   /********************************************/
   class BinaryUpwardProtocol: public UpwardProtocol {
   private:
@@ -194,65 +194,69 @@ namespace HamaPipes {
     BinaryUpwardProtocol(FILE* _stream) {
       stream = new FileOutStream();
       HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
-        
+      
     }
-
+    
     virtual void sendCMD(int32_t cmd) {
       serializeInt(cmd, *stream);
       stream->flush();
       if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
-          messageTypeNames[cmd]);
+                         messageTypeNames[cmd]);
     }
-      
+    
     virtual void sendCMD(int32_t cmd, int32_t value) {
       serializeInt(cmd, *stream);
       serializeInt(value, *stream);
       stream->flush();
-      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value);
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",
+                         messageTypeNames[cmd],value);
     }
-      
+    
     virtual void sendCMD(int32_t cmd, const string& value) {
       serializeInt(cmd, *stream);
       serializeString(value, *stream);
       stream->flush();
-      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str());
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",
+                         messageTypeNames[cmd],value.c_str());
     }
-      
+    
     virtual void sendCMD(int32_t cmd, const string values[], int size) {
-      serializeInt(cmd, *stream);      
-      for (int i=0; i<size; i++) { 
+      serializeInt(cmd, *stream);
+      for (int i=0; i<size; i++) {
         serializeString(values[i], *stream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",
+                           messageTypeNames[cmd],i+1,values[i].c_str());
       }
       stream->flush();
     }
-      
+    
     virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) {
       serializeInt(cmd, *stream);
       serializeInt(value, *stream);
-      for (int i=0; i<size; i++) { 
+      for (int i=0; i<size; i++) {
         serializeString(values[i], *stream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
-      } 
+        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",
+                           messageTypeNames[cmd],i+1,values[i].c_str());
+      }
       stream->flush();
     }
-
-    /*
-    virtual void registerCounter(int id, const string& group, 
-                                 const string& name) {
-      serializeInt(REGISTER_COUNTER, *stream);
-      serializeInt(id, *stream);
-      serializeString(group, *stream);
-      serializeString(name, *stream);
-    }
     
-    virtual void incrementCounter(const TaskContext::Counter* counter, 
-                                  uint64_t amount) {
-      serializeInt(INCREMENT_COUNTER, *stream);
-      serializeInt(counter->getId(), *stream);
-      serializeLong(amount, *stream);
-    }
-    */
+    /*
+     virtual void registerCounter(int id, const string& group,
+     const string& name) {
+     serializeInt(REGISTER_COUNTER, *stream);
+     serializeInt(id, *stream);
+     serializeString(group, *stream);
+     serializeString(name, *stream);
+     }
+     
+     virtual void incrementCounter(const TaskContext::Counter* counter,
+     uint64_t amount) {
+     serializeInt(INCREMENT_COUNTER, *stream);
+     serializeInt(counter->getId(), *stream);
+     serializeLong(amount, *stream);
+     }
+     */
     
     virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
       serializeInt(INCREMENT_COUNTER, *stream);
@@ -262,235 +266,258 @@ namespace HamaPipes {
       stream->flush();
       if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
     }
-      
+    
     ~BinaryUpwardProtocol() {
       delete stream;
     }
   };
-
+  
   /********************************************/
-  /************** BinaryProtocol **************/  
+  /************** BinaryProtocol **************/
   /********************************************/
   class BinaryProtocol: public Protocol {
   private:
     FileInStream* downStream;
     DownwardProtocol* handler;
     BinaryUpwardProtocol * uplink;
-      
+    
     string key;
     string value;
-   
+    
   public:
     BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
       downStream = new FileInStream();
       downStream->open(down);
       uplink = new BinaryUpwardProtocol(up);
       handler = _handler;
-      
-      //authDone = false;
-      //getPassword(password);
     }
-
+    
     UpwardProtocol* getUplink() {
       return uplink;
     }
-
-      
+    
+    
     virtual void nextEvent() {
       int32_t cmd;
       cmd = deserializeInt(*downStream);
-        
-     switch (cmd) {
-            
-      case START_MESSAGE: {
-        int32_t prot;
-        prot = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot); 
-        handler->start(prot);
-        break;
-      }
-      /* SET BSP Job Configuration / Environment */
-      case SET_BSPJOB_CONF: {
-        int32_t entries;
-        entries = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries); 
-        vector<string> result(entries*2);
-        for(int i=0; i < entries*2; ++i) {
-          string item;
-          deserializeString(item, *downStream);
-          result.push_back(item);
-          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str()); 
-        }
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries);
-        handler->setBSPJob(result);
-        break;
-      }
-      case SET_INPUT_TYPES: {
-        string keyType;
-        string valueType;
-        deserializeString(keyType, *downStream);
-        deserializeString(valueType, *downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
-                keyType.c_str(),valueType.c_str()); 
-        handler->setInputTypes(keyType, valueType);
-        break;
-      }
-      case READ_KEYVALUE: {
-        deserializeString(key, *downStream);
-        deserializeString(value, *downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
-                key.c_str(),
-                ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); 
-        handler->setKeyValue(key, value);
-        break;
-      }
-      case RUN_SETUP: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); 
-        int32_t pipedInput;
-        int32_t pipedOutput;
-        pipedInput = deserializeInt(*downStream);
-        pipedOutput = deserializeInt(*downStream);
-        handler->runSetup(pipedInput, pipedOutput);
-        break;
-      }
-      case RUN_BSP: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); 
-        int32_t pipedInput;
-        int32_t pipedOutput;
-        pipedInput = deserializeInt(*downStream);
-        pipedOutput = deserializeInt(*downStream);
-        handler->runBsp(pipedInput, pipedOutput);
-        break;
-      }
-      case RUN_CLEANUP: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); 
-        int32_t pipedInput;
-        int32_t pipedOutput;
-        pipedInput = deserializeInt(*downStream);
-        pipedOutput = deserializeInt(*downStream);
-        handler->runCleanup(pipedInput, pipedOutput);
-        break;
-      }
-      
-      case PARTITION_REQUEST: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); 
-        string partionKey;
-        string partionValue;
-        int32_t numTasks;
-        deserializeString(partionKey, *downStream);
-        deserializeString(partionValue, *downStream);
-        numTasks = deserializeInt(*downStream);
-        handler->runPartition(partionKey, partionValue, numTasks);
-        break;
-      }
-
-        
-      case GET_MSG_COUNT: {
-        int32_t msgCount = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount); 
-        handler->setNewResult(msgCount);
-        break;
-      }
-      case GET_MSG: {
-        string msg;
-        deserializeString(msg,*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str());
-        handler->setNewResult(msg);
-        break;
-      }
-      case GET_PEERNAME: {
-        string peername;
-        deserializeString(peername,*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str());
-        handler->setNewResult(peername);
-        break;
-      }
-      case GET_ALL_PEERNAME: {
-        vector<string> peernames;
-        int32_t peernameCount = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount);
-        string peername;
-        for (int i=0; i<peernameCount; i++)  {
+      
+      switch (cmd) {
+          
+        case START_MESSAGE: {
+          int32_t prot;
+          prot = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot);
+          handler->start(prot);
+          break;
+        }
+          /* SET BSP Job Configuration / Environment */
+        case SET_BSPJOB_CONF: {
+          int32_t entries;
+          entries = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n",
+                    entries);
+          vector<string> result(entries*2);
+          for(int i=0; i < entries*2; ++i) {
+            string item;
+            deserializeString(item, *downStream);
+            result.push_back(item);
+            if(logging)
+              fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n",
+                      item.c_str());
+          }
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n",
+                    entries);
+          handler->setBSPJob(result);
+          break;
+        }
+        case SET_INPUT_TYPES: {
+          string keyType;
+          string valueType;
+          deserializeString(keyType, *downStream);
+          deserializeString(valueType, *downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+                    keyType.c_str(),valueType.c_str());
+          handler->setInputTypes(keyType, valueType);
+          break;
+        }
+        case READ_KEYVALUE: {
+          deserializeString(key, *downStream);
+          deserializeString(value, *downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
+                    key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str()));
+          handler->setKeyValue(key, value);
+          break;
+        }
+        case RUN_SETUP: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n");
+          int32_t pipedInput;
+          int32_t pipedOutput;
+          pipedInput = deserializeInt(*downStream);
+          pipedOutput = deserializeInt(*downStream);
+          handler->runSetup(pipedInput, pipedOutput);
+          break;
+        }
+        case RUN_BSP: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n");
+          int32_t pipedInput;
+          int32_t pipedOutput;
+          pipedInput = deserializeInt(*downStream);
+          pipedOutput = deserializeInt(*downStream);
+          handler->runBsp(pipedInput, pipedOutput);
+          break;
+        }
+        case RUN_CLEANUP: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n");
+          int32_t pipedInput;
+          int32_t pipedOutput;
+          pipedInput = deserializeInt(*downStream);
+          pipedOutput = deserializeInt(*downStream);
+          handler->runCleanup(pipedInput, pipedOutput);
+          break;
+        }
+          
+        case PARTITION_REQUEST: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n");
+          string partionKey;
+          string partionValue;
+          int32_t numTasks;
+          deserializeString(partionKey, *downStream);
+          deserializeString(partionValue, *downStream);
+          numTasks = deserializeInt(*downStream);
+          handler->runPartition(partionKey, partionValue, numTasks);
+          break;
+        }
+          
+        case GET_MSG_COUNT: {
+          int32_t msgCount = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",
+                    msgCount);
+          handler->setNewResult(msgCount);
+          break;
+        }
+        case GET_MSG: {
+          string msg;
+          deserializeString(msg,*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",
+                    msg.c_str());
+          handler->setNewResult(msg);
+          break;
+        }
+        case GET_PEERNAME: {
+          string peername;
           deserializeString(peername,*downStream);
-          peernames.push_back(peername);
-          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",peername.c_str());
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",
+                    peername.c_str());
+          handler->setNewResult(peername);
+          break;
         }
-        handler->setNewResult(peernames);
-        break;
-      }
-      case GET_PEER_INDEX: {
-        int32_t peerIndex = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex); 
-        handler->setNewResult(peerIndex);
-        break;
-      }
-      case GET_PEER_COUNT: {
-        int32_t peerCount = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount); 
-        handler->setNewResult(peerCount);
-        break;
-      }
-      case GET_SUPERSTEP_COUNT: {
-        int64_t superstepCount = deserializeLong(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount); 
-        handler->setNewResult(superstepCount);
-        break;
-      }
-             
-             
-      case SEQFILE_OPEN: {
-        int32_t fileID = deserializeInt(*downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID); 
-        handler->setNewResult(fileID);
-        break;
-      }    
-      case SEQFILE_READNEXT: {
-        deserializeString(key, *downStream);
-        deserializeString(value, *downStream);
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n", 
-                key.c_str(),
-                ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); 
-        handler->setKeyValue(key, value);
-        break;
-      }
-      case SEQFILE_APPEND: {
+        case GET_ALL_PEERNAME: {
+          vector<string> peernames;
+          int32_t peernameCount = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",
+                    peernameCount);
+          string peername;
+          for (int i=0; i<peernameCount; i++)  {
+            deserializeString(peername,*downStream);
+            peernames.push_back(peername);
+            if(logging)
+              fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",
+                      peername.c_str());
+          }
+          handler->setNewResult(peernames);
+          break;
+        }
+        case GET_PEER_INDEX: {
+          int32_t peerIndex = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",
+                    peerIndex);
+          handler->setNewResult(peerIndex);
+          break;
+        }
+        case GET_PEER_COUNT: {
+          int32_t peerCount = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",
+                    peerCount);
+          handler->setNewResult(peerCount);
+          break;
+        }
+        case GET_SUPERSTEP_COUNT: {
+          int64_t superstepCount = deserializeLong(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",
+                    (long)superstepCount);
+          handler->setNewResult(superstepCount);
+          break;
+        }
+          
+          
+        case SEQFILE_OPEN: {
+          int32_t fileID = deserializeInt(*downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID);
+          handler->setNewResult(fileID);
+          break;
+        }
+        case SEQFILE_READNEXT: {
+          deserializeString(key, *downStream);
+          deserializeString(value, *downStream);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n",
+                    key.c_str(),((value.length()<10)?value.c_str():value.substr(0,9).c_str()));
+          handler->setKeyValue(key, value);
+          break;
+        }
+        case SEQFILE_APPEND: {
           int32_t result = deserializeInt(*downStream);
-          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
           handler->setNewResult(result);
           break;
-      }   
-      case SEQFILE_CLOSE: {
+        }
+        case SEQFILE_CLOSE: {
           int32_t result = deserializeInt(*downStream);
-          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
+          if(logging)
+            fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
           handler->setNewResult(result);
           break;
+        }
+          
+          
+        case CLOSE: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n");
+          handler->close();
+          break;
+        }
+        case ABORT: {
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n");
+          handler->abort();
+          break;
+        }
+        default:
+          HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+          fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd);
       }
-             
-        
-      case CLOSE: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); 
-        handler->close();
-        break;
-      }
-      case ABORT: {
-        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); 
-        handler->abort();
-        break;
-      }        
-      default:
-        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
-        fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd); 
-      }
-     }
-      
+    }
+    
     virtual ~BinaryProtocol() {
       delete downStream;
       delete uplink;
     }
   };
-
+  
   /********************************************/
-  /************** BSPContextImpl **************/  
+  /************** BSPContextImpl **************/
   /********************************************/
   class BSPContextImpl: public BSPContext, public DownwardProtocol {
   private:
@@ -509,7 +536,7 @@ namespace HamaPipes {
     //float progressFloat;
     //uint64_t lastProgress;
     //bool statusSet;
-      
+    
     Protocol* protocol;
     UpwardProtocol *uplink;
     
@@ -517,36 +544,36 @@ namespace HamaPipes {
     
     RecordReader* reader;
     RecordWriter* writer;
-      
+    
     BSP* bsp;
     Partitioner* partitioner;
     
     const Factory* factory;
     pthread_mutex_t mutexDone;
     std::vector<int> registeredCounterIds;
-      
+    
     int32_t resultInt;
-    bool isNewResultInt;  
+    bool isNewResultInt;
     int64_t resultLong;
-    bool isNewResultLong; 
+    bool isNewResultLong;
     string resultString;
-    bool isNewResultString;   
+    bool isNewResultString;
     vector<string> resultVector;
-    bool isNewResultVector; 
+    bool isNewResultVector;
     
-    bool isNewKeyValuePair;  
+    bool isNewKeyValuePair;
     string currentKey;
     string currentValue;
-
+    
   public:
-
+    
     BSPContextImpl(const Factory& _factory) {
       //statusSet = false;
       done = false;
       //newKey = NULL;
       factory = &_factory;
       job = NULL;
-        
+      
       inputKeyClass = NULL;
       inputValueClass = NULL;
       
@@ -563,26 +590,26 @@ namespace HamaPipes {
       //progressFloat = 0.0f;
       hasTask = false;
       pthread_mutex_init(&mutexDone, NULL);
-        
+      
       isNewResultInt = false;
       isNewResultString = false,
       isNewResultVector = false;
-        
+      
       isNewKeyValuePair = false;
     }
-
-  
+    
+    
     /********************************************/
-    /*********** DownwardProtocol IMPL **********/  
+    /*********** DownwardProtocol IMPL **********/
     /********************************************/
     virtual void start(int protocol) {
       if (protocol != 0) {
-        throw Error("Protocol version " + toString(protocol) + 
+        throw Error("Protocol version " + toString(protocol) +
                     " not supported");
       }
       partitioner = factory->createPartitioner(*this);
     }
-
+    
     virtual void setBSPJob(vector<string> values) {
       int len = values.size();
       BSPJobImpl* result = new BSPJobImpl();
@@ -592,24 +619,25 @@ namespace HamaPipes {
       }
       job = result;
     }
-
+    
     virtual void setInputTypes(string keyType, string valueType) {
       inputKeyClass = new string(keyType);
       inputValueClass = new string(valueType);
     }
-      
+    
     virtual void setKeyValue(const string& _key, const string& _value) {
       currentKey = _key;
       currentValue = _value;
       isNewKeyValuePair = true;
     }
-     
+    
     /* private Method */
     void setupReaderWriter(bool pipedInput, bool pipedOutput) {
-        
-      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
-              (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
-
+      
+      if(logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+                (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
+      
       if (pipedInput && reader==NULL) {
         reader = factory->createRecordReader(*this);
         HADOOP_ASSERT((reader == NULL) == pipedInput,
@@ -617,10 +645,10 @@ namespace HamaPipes {
                       "RecordReader not defined");
         
         //if (reader != NULL) {
-        //    value = new string();
+        //  value = new string();
         //}
-      }  
-        
+      }
+      
       if (pipedOutput && writer==NULL) {
         writer = factory->createRecordWriter(*this);
         HADOOP_ASSERT((writer == NULL) == pipedOutput,
@@ -628,13 +656,13 @@ namespace HamaPipes {
                       "RecordWriter not defined");
       }
     }
-      
+    
     virtual void runSetup(bool pipedInput, bool pipedOutput) {
       setupReaderWriter(pipedInput,pipedOutput);
       
-      if (bsp == NULL)  
+      if (bsp == NULL)
         bsp = factory->createBSP(*this);
-        
+      
       if (bsp != NULL) {
         hasTask = true;
         bsp->setup(*this);
@@ -642,13 +670,13 @@ namespace HamaPipes {
         uplink->sendCMD(TASK_DONE);
       }
     }
-      
+    
     virtual void runBsp(bool pipedInput, bool pipedOutput) {
       setupReaderWriter(pipedInput,pipedOutput);
-
-      if (bsp == NULL)  
-          bsp = factory->createBSP(*this);
-
+      
+      if (bsp == NULL)
+        bsp = factory->createBSP(*this);
+      
       if (bsp != NULL) {
         hasTask = true;
         bsp->bsp(*this);
@@ -656,10 +684,10 @@ namespace HamaPipes {
         uplink->sendCMD(TASK_DONE);
       }
     }
-      
+    
     virtual void runCleanup(bool pipedInput, bool pipedOutput) {
       setupReaderWriter(pipedInput,pipedOutput);
-        
+      
       if (bsp != NULL) {
         hasTask = true;
         bsp->cleanup(*this);
@@ -667,54 +695,56 @@ namespace HamaPipes {
         uplink->sendCMD(TASK_DONE);
       }
     }
-      
+    
+    /********************************************/
+    /*******       Partitioner            *******/
     /********************************************/
-    /*******       Partitioner            *******/  
-    /********************************************/ 
     virtual void runPartition(const string& key, const string& value, int32_t numTasks){
-      if (partitioner != NULL) {             
+      if (partitioner != NULL) {
         int part = partitioner->partition(key, value, numTasks);
         uplink->sendCMD(PARTITION_RESPONSE, part);
       } else {
-        if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+        if(logging)
+          fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
       }
-    } 
-                          
+    }
+    
     virtual void setNewResult(int32_t _value) {
       resultInt = _value;
-      isNewResultInt = true;  
+      isNewResultInt = true;
     }
-
+    
     virtual void setNewResult(int64_t _value) {
       resultLong = _value;
-      isNewResultLong = true;  
+      isNewResultLong = true;
     }
-      
+    
     virtual void setNewResult(const string& _value) {
       resultString = _value;
-      isNewResultString = true;   
+      isNewResultString = true;
     }
-
+    
     virtual void setNewResult(vector<string> _value) {
       resultVector = _value;
-      isNewResultVector = true;    
+      isNewResultVector = true;
     }
-
+    
     virtual void close() {
       pthread_mutex_lock(&mutexDone);
       done = true;
       hasTask = false;
       pthread_mutex_unlock(&mutexDone);
-      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+      if(logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
                 (done)?"true":"false",(hasTask)?"true":"false");
     }
-      
+    
     virtual void abort() {
       throw Error("Aborted by driver");
     }
-
+    
     /********************************************/
-    /************** TaskContext IMPL ************/  
+    /************** TaskContext IMPL ************/
     /********************************************/
     
     /**
@@ -723,78 +753,78 @@ namespace HamaPipes {
     virtual const BSPJob* getBSPJob() {
       return job;
     }
-
+    
     /**
-     * Get the current key. 
+     * Get the current key.
      * @return the current key or NULL if called before the first map or reduce
      */
     //virtual const string& getInputKey() {
     //  return key;
     //}
-
+    
     /**
-     * Get the current value. 
-     * @return the current value or NULL if called before the first map or 
+     * Get the current value.
+     * @return the current value or NULL if called before the first map or
      *    reduce
      */
     //virtual const string& getInputValue() {
     //  return *value;
     //}
-      
+    
     /**
      * Register a counter with the given group and name.
      */
     /*
-    virtual Counter* getCounter(const std::string& group, 
-                                  const std::string& name) {
-        int id = registeredCounterIds.size();
-        registeredCounterIds.push_back(id);
-        uplink->registerCounter(id, group, name);
-        return new Counter(id);
-    }*/
-      
+     virtual Counter* getCounter(const std::string& group,
+     const std::string& name) {
+     int id = registeredCounterIds.size();
+     registeredCounterIds.push_back(id);
+     uplink->registerCounter(id, group, name);
+     return new Counter(id);
+     }*/
+    
     /**
      * Increment the value of the counter with the given amount.
      */
     virtual void incrementCounter(const string& group, const string& name, uint64_t amount)  {
-        uplink->incrementCounter(group, name, amount); 
+      uplink->incrementCounter(group, name, amount);
     }
-      
+    
     /********************************************/
-    /************** BSPContext IMPL *************/  
+    /************** BSPContext IMPL *************/
     /********************************************/
-      
+    
     /**
      * Access the InputSplit of the bsp.
      */
     //virtual const string& getInputSplit() {
     //  return *inputSplit;
     //}
-      
+    
     /**
      * Get the name of the key class of the input to this task.
      */
     virtual const string& getInputKeyClass() {
       return *inputKeyClass;
     }
-
+    
     /**
      * Get the name of the value class of the input to this task.
      */
     virtual const string& getInputValueClass() {
       return *inputValueClass;
     }
-
+    
     /**
      * Send a data with a tag to another BSPSlave corresponding to hostname.
      * Messages sent by this method are not guaranteed to be received in a sent
      * order.
      */
     virtual void sendMessage(const string& peerName, const string& msg) {
-        string values[] = {peerName, msg};
-        uplink->sendCMD(SEND_MSG,values, 2);
+      string values[] = {peerName, msg};
+      uplink->sendCMD(SEND_MSG,values, 2);
     }
-      
+    
     /**
      * @return A message from the peer's received messages queue (a FIFO).
      */
@@ -802,29 +832,31 @@ namespace HamaPipes {
       uplink->sendCMD(GET_MSG);
       
       while (!isNewResultString)
-          protocol->nextEvent();
-        
+        protocol->nextEvent();
+      
       isNewResultString = false;
-      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str());
+      if(logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",
+                resultString.c_str());
       return resultString;
     }
-
+    
     /**
      * @return The number of messages in the peer's received messages queue.
      */
     virtual int getNumCurrentMessages() {
       uplink->sendCMD(GET_MSG_COUNT);
-        
+      
       while (!isNewResultInt)
         protocol->nextEvent();
       
       isNewResultInt = false;
       return resultInt;
     }
-      
+    
     /**
      * Barrier Synchronization.
-     * 
+     *
      * Sends all the messages in the outgoing message queues to the corresponding
      * remote peers.
      */
@@ -834,14 +866,16 @@ namespace HamaPipes {
     
     /**
      * @return the name of this peer in the format "hostname:port".
-     */ 
+     */
     virtual const string& getPeerName() {
       uplink->sendCMD(GET_PEERNAME,-1);
-    
+      
       while (!isNewResultString)
         protocol->nextEvent();
-    
-      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+      
+      if(logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",
+                resultString.c_str());
       isNewResultString = false;
       return resultString;
     }
@@ -851,11 +885,13 @@ namespace HamaPipes {
      */
     virtual const string& getPeerName(int index) {
       uplink->sendCMD(GET_PEERNAME,index);
-        
+      
       while (!isNewResultString)
         protocol->nextEvent();
-  
-      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+      
+      if(logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",
+                resultString.c_str());
       isNewResultString = false;
       return resultString;
     }
@@ -866,10 +902,10 @@ namespace HamaPipes {
      */
     virtual vector<string> getAllPeerNames() {
       uplink->sendCMD(GET_ALL_PEERNAME);
-        
+      
       while (!isNewResultVector)
         protocol->nextEvent();
-        
+      
       isNewResultVector = false;
       return resultVector;
     }
@@ -879,39 +915,39 @@ namespace HamaPipes {
      */
     virtual int getPeerIndex() {
       uplink->sendCMD(GET_PEER_INDEX);
-        
+      
       while (!isNewResultInt)
         protocol->nextEvent();
-        
+      
       isNewResultInt = false;
       return resultInt;
     }
-      
+    
     /**
      * @return the number of peers
      */
     virtual int getNumPeers() {
       uplink->sendCMD(GET_PEER_COUNT);
-        
+      
       while (!isNewResultInt)
         protocol->nextEvent();
-        
+      
       isNewResultInt = false;
-      return resultInt;       
+      return resultInt;
     }
-      
+    
     /**
      * @return the count of current super-step
      */
     virtual long getSuperstepCount() {
       uplink->sendCMD(GET_SUPERSTEP_COUNT);
-        
+      
       while (!isNewResultLong)
         protocol->nextEvent();
-        
+      
       isNewResultLong = false;
-      return resultLong;     
-    }  
+      return resultLong;
+    }
     
     /**
      * Clears all queues entries.
@@ -919,17 +955,17 @@ namespace HamaPipes {
     virtual void clear() {
       uplink->sendCMD(CLEAR);
     }
-
+    
     /**
      * Writes a key/value pair to the output collector
      */
     virtual void write(const string& key, const string& value) {
-        if (writer != NULL) {
-            writer->emit(key, value);
-        } else {
-            string values[] = {key, value};
-            uplink->sendCMD(WRITE_KEYVALUE, values, 2);
-        }
+      if (writer != NULL) {
+        writer->emit(key, value);
+      } else {
+        string values[] = {key, value};
+        uplink->sendCMD(WRITE_KEYVALUE, values, 2);
+      }
     }
     
     /**
@@ -937,21 +973,30 @@ namespace HamaPipes {
      */
     virtual bool readNext(string& _key, string& _value) {
       uplink->sendCMD(READ_KEYVALUE);
-        
+      
       while (!isNewKeyValuePair)
         protocol->nextEvent();
       
       isNewKeyValuePair = false;
-        
+      
       _key = currentKey;
-      _value = currentValue;
       
-      if (logging && _key.empty() && _value.empty())  
+      // check if value is array [0, 1, 2, ...], and remove brackets
+      int len = currentValue.length();
+      if ( (currentValue[0]=='[') &&
+          (currentValue[len-1]==']') ) {
+        _value = currentValue.substr(1,len-2);
+      } else {
+        _value = currentValue;
+      }
+      
+      if (logging && _key.empty() && _value.empty()) {
         fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
-        
+      }
+      
       return (!_key.empty() && !_value.empty());
     }
-       
+    
     /**
      * Closes the input and opens it right away, so that the file pointer is at
      * the beginning again.
@@ -959,154 +1004,163 @@ namespace HamaPipes {
     virtual void reopenInput() {
       uplink->sendCMD(REOPEN_INPUT);
     }
-      
-      
+    
+    
     /********************************************/
-    /*******  SequenceFileConnector IMPL  *******/  
-    /********************************************/     
-      
+    /*******  SequenceFileConnector IMPL  *******/
+    /********************************************/
+    
     /**
      * Open SequenceFile with opion "r" or "w"
      * @return the corresponding fileID
      */
-    virtual int sequenceFileOpen(const string& path, const string& option, 
+    virtual int sequenceFileOpen(const string& path, const string& option,
                                  const string& keyType, const string& valueType) {
-      if (logging)  
-        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str());
-     
-      if ( (option.compare("r")==0) || (option.compare("w")==0))  {
-          
-          string values[] = {path, option, keyType, valueType};
-          uplink->sendCMD(SEQFILE_OPEN,values, 4);
+      if (logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",
+                path.c_str());
       
-          while (!isNewResultInt)
-            protocol->nextEvent();
+      if ( (option.compare("r")==0) || (option.compare("w")==0))  {
+        
+        string values[] = {path, option, keyType, valueType};
+        uplink->sendCMD(SEQFILE_OPEN,values, 4);
+        
+        while (!isNewResultInt)
+          protocol->nextEvent();
         
-          isNewResultInt = false;
-          return resultInt;
-      } else { 
-          fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str());
-          return -1; //Error wrong option
+        isNewResultInt = false;
+        return resultInt;
+      } else {
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",
+                option.c_str());
+        return -1; //Error wrong option
       }
     }
-
+    
     /**
      * Read next key/value pair from the SequenceFile with fileID
      */
     virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) {
-        
+      
       uplink->sendCMD(SEQFILE_READNEXT,fileID);
-        
+      
       while (!isNewKeyValuePair)
         protocol->nextEvent();
-        
+      
       isNewKeyValuePair = false;
-        
+      
       _key = currentKey;
-      _value = currentValue;
-        
-      if (logging && _key.empty() && _value.empty())  
+      // check if value is array [0, 1, 2, ...], and remove brackets
+      int len = currentValue.length();
+      if ( (currentValue[0]=='[') &&
+          (currentValue[len-1]==']') ) {
+        _value = currentValue.substr(1,len-2);
+      } else {
+        _value = currentValue;
+      }
+      
+      if (logging && _key.empty() && _value.empty())
         fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n");
-        
+      
       return (!_key.empty() && !_value.empty());
     }
-
+    
     /**
      * Append the next key/value pair to the SequenceFile with fileID
      */
     virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) {
       string values[] = {key, value};
       uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2);
-                
+      
       while (!isNewResultInt)
         protocol->nextEvent();
-        
+      
       isNewResultInt = false;
       return (resultInt==1);
     }
-
+    
     /**
      * Close SequenceFile
      */
     virtual bool sequenceFileClose(int fileID) {
       uplink->sendCMD(SEQFILE_CLOSE,fileID);
-        
+      
       while (!isNewResultInt)
         protocol->nextEvent();
-        
-      if (logging && resultInt==0)  
+      
+      if (logging && resultInt==0)
         fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
       else if (logging)
         fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
-    
+      
       isNewResultInt = false;
       return (resultInt==1);
     }
-      
+    
     /********************************************/
-    /*************** Other STUFF  ***************/  
+    /*************** Other STUFF  ***************/
     /********************************************/
-      
+    
     void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
-        protocol = _protocol;
-        uplink = _uplink;
+      protocol = _protocol;
+      uplink = _uplink;
     }
-   
+    
     bool isDone() {
-        pthread_mutex_lock(&mutexDone);
-        bool doneCopy = done;
-        pthread_mutex_unlock(&mutexDone);
-        return doneCopy;
+      pthread_mutex_lock(&mutexDone);
+      bool doneCopy = done;
+      pthread_mutex_unlock(&mutexDone);
+      return doneCopy;
     }
-      
+    
     /**
      * Advance to the next value.
      */
     /*
-    bool nextValue() {
-        if (isNewKey || done) {
-            return false;
-        }
-        isNewValue = false;
-        //progress();
-        protocol->nextEvent();
-        return isNewValue;
-    } 
-    */
+     bool nextValue() {
+     if (isNewKey || done) {
+     return false;
+     }
+     isNewValue = false;
+     //progress();
+     protocol->nextEvent();
+     return isNewValue;
+     }
+     */
     void waitForTask() {
-        while (!done && !hasTask) {		
-            if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
-                    (done)?"true":"false",(hasTask)?"true":"false");
-            protocol->nextEvent();
-        }
+      while (!done && !hasTask) {
+        if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+                           (done)?"true":"false",(hasTask)?"true":"false");
+        protocol->nextEvent();
+      }
     }
     /*
-    bool nextKey() {
-        if (reader == NULL) {
-            while (!isNewKey) {
-                nextValue();
-                if (done) {
-                    return false;
-                }
-            }
-            key = *newKey;
-        } else {
-            if (!reader->next(key, const_cast<string&>(*value))) {
-                pthread_mutex_lock(&mutexDone);
-                done = true;
-                pthread_mutex_unlock(&mutexDone);
-                return false;
-            }
-            //progressFloat = reader->getProgress();
-        }
-        isNewKey = false;
-          
-        if (bsp != NULL) {
-            bsp->bsp(*this);
-        }
-        return true;
-    }
-   */
+     bool nextKey() {
+     if (reader == NULL) {
+     while (!isNewKey) {
+     nextValue();
+     if (done) {
+     return false;
+     }
+     }
+     key = *newKey;
+     } else {
+     if (!reader->next(key, const_cast<string&>(*value))) {
+     pthread_mutex_lock(&mutexDone);
+     done = true;
+     pthread_mutex_unlock(&mutexDone);
+     return false;
+     }
+     //progressFloat = reader->getProgress();
+     }
+     isNewKey = false;
+     
+     if (bsp != NULL) {
+     bsp->bsp(*this);
+     }
+     return true;
+     }
+     */
     void closeAll() {
       if (reader) {
         reader->close();
@@ -1115,12 +1169,12 @@ namespace HamaPipes {
       if (bsp) {
         bsp->close();
       }
-     
+      
       if (writer) {
         writer->close();
       }
     }
-      
+    
     virtual ~BSPContextImpl() {
       delete job;
       delete inputKeyClass;
@@ -1135,9 +1189,9 @@ namespace HamaPipes {
       pthread_mutex_destroy(&mutexDone);
     }
   };
-
+  
   /**
-   * Ping the parent every 5 seconds to know if it is alive 
+   * Ping the parent every 5 seconds to know if it is alive
    */
   void* ping(void* ptr) {
     BSPContextImpl* context = (BSPContextImpl*) ptr;
@@ -1156,11 +1210,13 @@ namespace HamaPipes {
           addr.sin_family = AF_INET;
           addr.sin_port = htons(toInt(portStr));
           addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-          if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr);   
+          if(logging)
+            fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n",
+                    portStr);
           HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                         string("problem connecting command socket: ") +
                         strerror(errno));
-
+          
         }
         if (sock != -1) {
           int result = shutdown(sock, SHUT_RDWR);
@@ -1171,8 +1227,8 @@ namespace HamaPipes {
         remaining_retries = MAX_RETRIES;
       } catch (Error& err) {
         if (!context->isDone()) {
-          fprintf(stderr, "Hama Pipes Exception: in ping %s\n", 
-                err.getMessage().c_str());
+          fprintf(stderr, "Hama Pipes Exception: in ping %s\n",
+                  err.getMessage().c_str());
           remaining_retries -= 1;
           if (remaining_retries == 0) {
             exit(1);
@@ -1184,23 +1240,25 @@ namespace HamaPipes {
     }
     return NULL;
   }
-    
+  
   /**
    * Run the assigned task in the framework.
-   * The user's main function should set the various functions using the 
+   * The user's main function should set the various functions using the
    * set* functions above and then call this.
    * @return true, if the task succeeded.
    */
   bool runTask(const Factory& factory) {
     try {
       HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!");
-        
-      logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;  
-      if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false"); 
-        
+      
+      logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;
+      if(logging)
+        fprintf(stderr,"HamaPipes::runTask - logging is: %s\n",
+                ((logging)?"true":"false"));
+      
       BSPContextImpl* context = new BSPContextImpl(factory);
       Protocol* connection;
-        
+      
       char* portStr = getenv("hama.pipes.command.port");
       int sock = -1;
       FILE* stream = NULL;
@@ -1218,10 +1276,10 @@ namespace HamaPipes {
         HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                       string("problem connecting command socket: ") +
                       strerror(errno));
-
+        
         stream = fdopen(sock, "r");
         outStream = fdopen(sock, "w");
-
+        
         // increase buffer size
         int bufsize = 128*1024;
         int setbuf;
@@ -1229,14 +1287,16 @@ namespace HamaPipes {
         bufout = new char[bufsize];
         setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
-                                     + strerror(errno));
+                      + strerror(errno));
         setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
         HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
-                                     + strerror(errno));
-          
+                      + strerror(errno));
+        
         connection = new BinaryProtocol(stream, context, outStream);
-        if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr);  
-          
+        if(logging)
+          fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n",
+                  portStr);
+        
       } else if (getenv("hama.pipes.command.file")) {
         char* filename = getenv("hama.pipes.command.file");
         string outFilename = filename;
@@ -1249,18 +1309,18 @@ namespace HamaPipes {
         fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n");
         return -1;
       }
- 
+      
       context->setProtocol(connection, connection->getUplink());
-        
+      
       //pthread_t pingThread;
       //pthread_create(&pingThread, NULL, ping, (void*)(context));
       
       context->waitForTask();
-        
+      
       //while (!context->isDone()) {
-        //context->nextKey();
+      //context->nextKey();
       //}
-        
+      
       context->closeAll();
       connection->getUplink()->sendCMD(DONE);
       
@@ -1286,12 +1346,12 @@ namespace HamaPipes {
       }
       if (outStream != NULL) {
         //fclose(outStream);
-      } 
+      }
       delete bufin;
       delete bufout;
       return true;
     } catch (Error& err) {
-      fprintf(stderr, "Hama Pipes Exception: %s\n", 
+      fprintf(stderr, "Hama Pipes Exception: %s\n",
               err.getMessage().c_str());
       return false;
     }

Added: hama/trunk/c++/src/main/native/utils/impl/Splitter.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/Splitter.cc?rev=1527466&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/Splitter.cc (added)
+++ hama/trunk/c++/src/main/native/utils/impl/Splitter.cc Mon Sep 30 07:26:15 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "hadoop/Splitter.hh"
+
+#include <string>
+#include <vector>
+
+namespace HadoopUtils {
+
+
+  Splitter::Splitter ( const std::string& src, const std::string& delim ) {
+    reset ( src, delim );
+  }
+    
+  std::string& Splitter::operator[] ( size_type i ) {
+    return _tokens.at ( i );
+  }
+        
+  Splitter::size_type Splitter::size() {
+    return _tokens.size();
+  }
+        
+  void Splitter::reset ( const std::string& src, const std::string& delim ) {
+    std::vector<std::string> tokens;
+    std::string::size_type start = 0;
+    std::string::size_type end;
+            
+    for ( ; ; ) {
+      end = src.find ( delim, start );
+      tokens.push_back ( src.substr ( start, end - start ) );
+            
+      // We just copied the last token
+      if ( end == std::string::npos )
+        break;
+                
+        // Exclude the delimiter in the next search
+        start = end + delim.size();
+    }
+            
+    _tokens.swap ( tokens );
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Mon Sep 30 07:26:15 2013
@@ -213,9 +213,6 @@ public class PipesApplication<K1 extends
     LOG.debug("DEBUG: cmd: " + cmd);
     process = runClient(cmd, env); // fork c++ binary
 
-    LOG.debug("DEBUG: waiting for Client at "
-        + serverSocket.getLocalSocketAddress());
-
     try {
       if (!streamingEnabled) {
         LOG.debug("DEBUG: waiting for Client at "

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Mon Sep 30 07:26:15 2013
@@ -35,8 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.pipes.protocol.UplinkReader;
-import org.apache.hama.pipes.protocol.StreamingProtocol.StreamingUplinkReaderThread;
 import org.apache.hama.util.KeyValuePair;
 
 /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1527466&r1=1527465&r2=1527466&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Mon Sep 30 07:26:15 2013
@@ -315,10 +315,10 @@ public class UplinkReader<K1 extends Wri
 
       WritableUtils.writeVInt(stream, MessageType.READ_KEYVALUE.code);
       if (pair != null) {
-        binProtocol.writeObject(pair.getKey());
-        binProtocol.writeObject(pair.getValue());
-
+        binProtocol.writeObject(new Text(pair.getKey().toString()));
         String valueStr = pair.getValue().toString();
+        binProtocol.writeObject(new Text(valueStr));
+
         LOG.debug("Responded MessageType.READ_KEYVALUE - Key: "
             + pair.getKey()
             + " Value: "



Mime
View raw message