hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1518963 [1/2] - in /hama/trunk: ./ c++/ c++/pipes/ c++/src/ c++/src/main/ c++/src/main/native/ c++/src/main/native/pipes/ c++/src/main/native/pipes/api/ c++/src/main/native/pipes/api/hama/ c++/src/main/native/pipes/impl/ c++/src/main/nativ...
Date Fri, 30 Aug 2013 13:26:01 GMT
Author: millecker
Date: Fri Aug 30 13:26:00 2013
New Revision: 1518963

URL: http://svn.apache.org/r1518963
Log:
HAMA-749: Build for C++ Pipes

Added:
    hama/trunk/c++/pom.xml
    hama/trunk/c++/src/
    hama/trunk/c++/src/CMakeLists.txt
    hama/trunk/c++/src/JNIFlags.cmake
    hama/trunk/c++/src/main/
    hama/trunk/c++/src/main/native/
    hama/trunk/c++/src/main/native/pipes/
    hama/trunk/c++/src/main/native/pipes/api/
    hama/trunk/c++/src/main/native/pipes/api/hama/
    hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
    hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
    hama/trunk/c++/src/main/native/pipes/impl/
    hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
    hama/trunk/c++/src/main/native/utils/
    hama/trunk/c++/src/main/native/utils/api/
    hama/trunk/c++/src/main/native/utils/api/hadoop/
    hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
    hama/trunk/c++/src/main/native/utils/api/hadoop/Splitter.hh
    hama/trunk/c++/src/main/native/utils/api/hadoop/StringUtils.hh
    hama/trunk/c++/src/main/native/utils/impl/
    hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
    hama/trunk/c++/src/main/native/utils/impl/StringUtils.cc
    hama/trunk/c++/src/main/native/utils/m4/
Removed:
    hama/trunk/c++/pipes/
    hama/trunk/c++/utils/
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/c++/   (props changed)
    hama/trunk/pom.xml
    hama/trunk/src/assemble/bin.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1518963&r1=1518962&r2=1518963&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Aug 30 13:26:00 2013
@@ -21,6 +21,7 @@ Release 0.6.3 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-749: Build for C++ Pipes (Martin Illecker)
    HAMA-796: Add Vector multiply Matrix for DoubleVector as well as DenseDoubleVector. (Yexi Jiang)
    HAMA-770: Use a unified model to represent linear regression, logistic regression, MLP, autoencoder, and deepNets (Yexi Jiang)
    HAMA-671: Clean up Maven build scripts (edwardyoon)

Propchange: hama/trunk/c++/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Aug 30 13:26:00 2013
@@ -1 +1,3 @@
 install
+
+target

Added: hama/trunk/c++/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pom.xml?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/pom.xml (added)
+++ hama/trunk/c++/pom.xml Fri Aug 30 13:26:00 2013
@@ -0,0 +1,110 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    
+    <parent>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-parent</artifactId>
+      <version>0.6.3-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.hama</groupId>
+    <artifactId>hama-pipes</artifactId>
+    <version>0.6.3-SNAPSHOT</version>
+    <name>pipes</name>
+    <description>Apache Hama Pipes</description>
+    <packaging>pom</packaging>
+
+    <profiles>
+      <profile>
+        <id>native</id>
+        <activation>
+          <activeByDefault>true</activeByDefault>
+        </activation>
+        <build>
+          <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-antrun-plugin</artifactId>
+              <executions>
+                <execution>
+                  <id>make</id>
+                  <phase>compile</phase>
+                  <goals><goal>run</goal></goals>
+                  <configuration>
+                    <target>
+                      <mkdir dir="${project.build.directory}/native"/>
+                      <exec executable="cmake" dir="${project.build.directory}/native"
+                          failonerror="true">
+                        <arg line="${basedir}/src/ -DJVM_ARCH_DATA_MODEL=${sun.arch.data.model}"/>
+                      </exec>
+                      <exec executable="make" dir="${project.build.directory}/native" failonerror="true">
+                        <arg line="VERBOSE=1"/>
+                      </exec>
+                      <!-- The second make is a workaround for HADOOP-9215.  It can
+                           be removed when version 2.6 of cmake is no longer supported . -->
+                      <exec executable="make" dir="${project.build.directory}/native" failonerror="true"></exec>
+                    </target>
+                  </configuration>
+                </execution>
+                <!-- TODO wire here native testcases
+                <execution>
+                  <id>test</id>
+                  <phase>test</phase>
+                  <goals>
+                    <goal>test</goal>
+                  </goals>
+                  <configuration>
+                    <destDir>${project.build.directory}/native/target</destDir>
+                  </configuration>
+                </execution>
+                -->
+              </executions>
+            </plugin>
+          </plugins>
+        </build>
+      </profile>
+    </profiles>
+<!--
+    <build>
+      <plugins>
+       <plugin>
+          <artifactId>maven-antrun-plugin</artifactId>
+          <executions>
+            <execution>
+              <id>compile</id>
+              <phase>generate-sources</phase>
+              <goals>
+                <goal>run</goal>
+              </goals>
+              <configuration>
+                <target>
+                  <mkdir dir="${basedir}/../target/native"/>
+                  <copy toDir="${basedir}/../target/native">
+                    <fileset dir="${basedir}/src/main/native"/>
+                  </copy>
+                </target>
+              </configuration>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+    </build>
+-->
+</project>

Added: hama/trunk/c++/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/CMakeLists.txt?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/CMakeLists.txt (added)
+++ hama/trunk/c++/src/CMakeLists.txt Fri Aug 30 13:26:00 2013
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+ 
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+find_package(OpenSSL REQUIRED)
+
+set(CMAKE_BUILD_TYPE, Release)
+
+set(PIPES_FLAGS "-g -Wall -O2 -D_REENTRANT -D_GNU_SOURCE")
+set(PIPES_FLAGS "${PIPES_FLAGS} -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PIPES_FLAGS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${PIPES_FLAGS}")
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+function(output_directory TGT DIR)
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+   SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+include_directories(
+    main/native/utils/api
+    main/native/pipes/api
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${OPENSSL_INCLUDE_DIR}
+)
+
+# Example programs
+# add_executable(wordcount-simple main/native/examples/impl/wordcount-simple.cc)
+# target_link_libraries(wordcount-simple hadooppipes hadooputils)
+# output_directory(wordcount-simple examples)
+
+# add_executable(wordcount-part main/native/examples/impl/wordcount-part.cc)
+# target_link_libraries(wordcount-part hadooppipes hadooputils)
+# output_directory(wordcount-part examples)
+
+# add_executable(wordcount-nopipe main/native/examples/impl/wordcount-nopipe.cc)
+# target_link_libraries(wordcount-nopipe hadooppipes hadooputils)
+# output_directory(wordcount-nopipe examples)
+
+# add_executable(pipes-sort main/native/examples/impl/sort.cc)
+# target_link_libraries(pipes-sort hadooppipes hadooputils)
+# output_directory(pipes-sort examples)
+
+add_library(hadooputils STATIC
+    main/native/utils/impl/StringUtils.cc
+    main/native/utils/impl/SerialUtils.cc
+)
+
+add_library(hamapipes STATIC
+    main/native/pipes/impl/HamaPipes.cc
+)
+
+target_link_libraries(hamapipes
+    ${OPENSSL_LIBRARIES}
+    pthread
+)

Added: hama/trunk/c++/src/JNIFlags.cmake
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/JNIFlags.cmake?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/JNIFlags.cmake (added)
+++ hama/trunk/c++/src/JNIFlags.cmake Fri Aug 30 13:26:00 2013
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+    # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+        set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+    endif ()
+    if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+        # the 32-bit version of libjvm.so.
+        set(CMAKE_SYSTEM_PROCESSOR "i686")
+    endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+    find_program(READELF readelf)
+    if (READELF MATCHES "NOTFOUND")
+        message(WARNING "readelf not found; JVM float ABI detection disabled")
+    else (READELF MATCHES "NOTFOUND")
+        execute_process(
+            COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+            OUTPUT_VARIABLE JVM_ELF_ARCH
+            ERROR_QUIET)
+        if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+            message("Soft-float JVM detected")
+
+            # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+            # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+            # if soft-float dev libraries are not installed)
+            include(CMakePushCheckState)
+            cmake_push_check_state()
+            set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+            include(CheckSymbolExists)
+            check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+            if (NOT SOFTFP_AVAILABLE)
+                message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+            endif (NOT SOFTFP_AVAILABLE)
+            cmake_pop_check_state()
+
+            set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+        endif ()
+    endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+    #
+    # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+    # Since we were invoked from Maven, we know that the JAVA_HOME environment
+    # variable is valid.  So we ignore system paths here and just use JAVA_HOME.
+    #
+    FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+    IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+        SET(_java_libarch "i386")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        SET(_java_libarch "amd64")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+        SET(_java_libarch "arm")
+    ELSE()
+        SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+    ENDIF()
+    SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+                  "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+                  "${_JAVA_HOME}/jre/lib/*"
+                  "${_JAVA_HOME}/jre/lib"
+                  "${_JAVA_HOME}/lib/*"
+                  "${_JAVA_HOME}/lib"
+                  "${_JAVA_HOME}/include/*"
+                  "${_JAVA_HOME}/include"
+                  "${_JAVA_HOME}"
+    )
+    FIND_PATH(JAVA_INCLUDE_PATH
+        NAMES jni.h 
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    #In IBM java, it's jniport.h instead of jni_md.h
+    FIND_PATH(JAVA_INCLUDE_PATH2 
+        NAMES jni_md.h jniport.h
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+    FIND_LIBRARY(JAVA_JVM_LIBRARY
+        NAMES jvm JavaVM
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+    MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+    MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+    IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+        MESSAGE("Located all JNI components successfully.")
+    ELSE()
+        MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+    ENDIF()
+ELSE()
+    find_package(JNI REQUIRED)
+ENDIF()

Added: hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/Pipes.hh?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh (added)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/Pipes.hh Fri Aug 30 13:26:00 2013
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAMA_PIPES_HH
+#define HAMA_PIPES_HH
+
+#ifdef SWIG
+%module (directors="1") HamaPipes
+%include "std_string.i"
+%feature("director") BSP;
+%feature("director") Partitioner;
+%feature("director") RecordReader;
+%feature("director") RecordWriter;
+%feature("director") Factory;
+#else
+#include <string>
+#include <vector>
+#endif
+
+#include <stdint.h>
+
+using std::string;
+using std::vector;
+
+namespace HamaPipes {
+
+/**
+ * This interface defines the interface between application code and the 
+ * foreign code interface to Hadoop Map/Reduce.
+ */
+
+/**
+ * A BSPJob defines the properties for a job.
+ */
+class BSPJob {
+public:
+  virtual bool hasKey(const string& key) const = 0;
+  virtual const string& get(const string& key) const = 0;
+  virtual int getInt(const string& key) const = 0;
+  virtual float getFloat(const string& key) const = 0;
+  virtual bool getBoolean(const string&key) const = 0;
+  virtual ~BSPJob() {}
+};
+
+/**
+ * Task context provides the information about the task and job.
+ */
+class TaskContext {
+public:
+  /**
+   * Counter to keep track of a property and its value.
+   */
+  class Counter {
+  private:
+    int id;
+  public:
+    Counter(int counterId) : id(counterId) {}
+    Counter(const Counter& counter) : id(counter.id) {}
+
+    int getId() const { return id; }
+  };
+  
+  /**
+   * Get the BSPJob for the current task.
+   */
+  virtual const BSPJob* getBSPJob() = 0;
+
+  /**
+   * Get the current key. 
+   * @return the current key
+   */
+  //virtual const string& getInputKey() = 0;
+
+  /**
+   * Get the current value. 
+   * @return the current value
+   */
+  //virtual const string& getInputValue() = 0;
+
+  /**
+   * Generate an output record
+   */
+  //virtual void emit(const string& key, const string& value) = 0;
+
+  /**
+   * Mark your task as having made progress without changing the status 
+   * message.
+   */
+  //virtual void progress() = 0;
+
+  /**
+   * Set the status message and call progress.
+   */
+  //virtual void setStatus(const string& status) = 0;
+
+  /**
+   * Register a counter with the given group and name.
+   */
+  //virtual Counter* getCounter(const string& group, const string& name) = 0;
+
+  /**
+   * Increment the value of the counter with the given amount.
+   */
+  //virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+  virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+  
+  virtual ~TaskContext() {}
+};
+    
+    
+/**
+ * SequenceFile Connector
+ */
+class SequenceFileConnector {
+public:
+  /**
+   * Open SequenceFile with option "r" or "w"
+   * key and value type of the values stored in the SequenceFile
+   * @return the corresponding fileID
+   */
+  virtual int sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType) = 0;
+    
+  /**
+   * Read next key/value pair from the SequenceFile with fileID
+   */
+  virtual bool sequenceFileReadNext(int fileID, string& key, string& value) = 0;
+
+  /**
+   * Append the next key/value pair to the SequenceFile with fileID
+   */
+  virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) = 0;
+    
+  /**
+   * Close SequenceFile
+   */
+  virtual bool sequenceFileClose(int fileID) = 0;
+};    
+
+
+class BSPContext: public TaskContext, public SequenceFileConnector {
+public:
+
+  /**
+   * Access the InputSplit of the mapper.
+   */
+  //virtual const string& getInputSplit() = 0;
+
+  /**
+   * Get the name of the key class of the input to this task.
+   */
+  virtual const string& getInputKeyClass() = 0;
+
+  /**
+   * Get the name of the value class of the input to this task.
+   */
+  virtual const string& getInputValueClass() = 0;
+    
+    
+    
+  /**
+   * Send a data with a tag to another BSPSlave corresponding to hostname.
+   * Messages sent by this method are not guaranteed to be received in a sent
+   * order.
+   */
+  virtual void sendMessage(const string& peerName, const string& msg) = 0;
+    
+  /**
+   * @return A message from the peer's received messages queue (a FIFO).
+   */
+  virtual const string& getCurrentMessage() = 0;
+    
+  /**
+   * @return The number of messages in the peer's received messages queue.
+   */
+  virtual int getNumCurrentMessages() = 0;
+    
+  /**
+   * Barrier Synchronization.
+   * 
+   * Sends all the messages in the outgoing message queues to the corresponding
+   * remote peers.
+   */
+  virtual void sync() = 0;
+    
+  /**
+   * @return the count of current super-step
+   */
+  virtual long getSuperstepCount() = 0;
+     
+  /**
+   * @return the name of this peer in the format "hostname:port".
+   */ 
+  virtual const string& getPeerName() = 0;
+    
+  /**
+   * @return the name of n-th peer from sorted array by name.
+   */
+  virtual const string& getPeerName(int index) = 0;
+    
+  /**
+   * @return the index of this peer from sorted array by name.
+   */
+  virtual int getPeerIndex() = 0;
+    
+  /**
+   * @return the names of all the peers executing tasks from the same job
+   *         (including this peer).
+   */
+  virtual vector<string> getAllPeerNames() = 0;
+    
+  /**
+   * @return the number of peers
+   */
+  virtual int getNumPeers() = 0;
+    
+  /**
+   * Clears all queues entries.
+   */
+  virtual void clear() = 0;
+    
+  /**
+   * Writes a key/value pair to the output collector
+   */
+  virtual void write(const string& key, const string& value) = 0;
+    
+  /**
+   * Deserializes the next input key value into the given objects;
+   */
+  virtual bool readNext(string& key, string& value) = 0;
+    
+    /**
+     * Reads the next key value pair and returns it as a pair. It may reuse a
+     * {@link KeyValuePair} instance to save garbage collection time.
+     * 
+     * @return null if there are no records left.
+     * @throws IOException
+     */
+    //public KeyValuePair<K1, V1> readNext() throws IOException;
+    
+  /**
+   * Closes the input and opens it right away, so that the file pointer is at
+   * the beginning again.
+   */
+  virtual void reopenInput() = 0;
+    
+};
+    
+class Closable {
+public:
+  virtual void close() {}
+  virtual ~Closable() {}
+};
+
+/**
+ * The application's BSP class to do bsp.
+ */
+class BSP: public Closable {
+public:
+  /**
+   * This method is called before the BSP method. It can be used for setup
+   * purposes.
+   */
+  virtual void setup(BSPContext& context) = 0;
+   
+  /**
+   * This method is your computation method, the main work of your BSP should be
+   * done here.
+   */
+  virtual void bsp(BSPContext& context) = 0;
+    
+  /**
+   * This method is called after the BSP method. It can be used for cleanup
+   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+   * case of exceptions.
+   */
+  virtual void cleanup(BSPContext& context) = 0;
+};
+
+/**
+ * User code to decide where each key should be sent.
+ */
+class Partitioner {
+public:
+    
+    virtual int partition(const string& key,const string& value, int32_t numTasks) = 0;
+    virtual ~Partitioner() {}
+};
+    
+/**
+ * For applications that want to read the input directly for the map function
+ * they can define RecordReaders in C++.
+ */
+class RecordReader: public Closable {
+public:
+  virtual bool next(string& key, string& value) = 0;
+
+  /**
+   * The progress of the record reader through the split as a value between
+   * 0.0 and 1.0.
+   */
+  virtual float getProgress() = 0;
+};
+
+/**
+ * An object to write key/value pairs as they are emited from the reduce.
+ */
+class RecordWriter: public Closable {
+public:
+  virtual void emit(const string& key,
+                    const string& value) = 0;
+};
+
+/**
+ * A factory to create the necessary application objects.
+ */
+class Factory {
+public:
+  virtual BSP* createBSP(BSPContext& context) const = 0;
+
+  /**
+   * Create an application partitioner object.
+   * @return the new partitioner or NULL, if the default partitioner should be 
+   * used.
+   */
+  virtual Partitioner* createPartitioner(BSPContext& context) const {
+    return NULL;
+  }
+    
+  /**
+   * Create an application record reader.
+   * @return the new RecordReader or NULL, if the Java RecordReader should be
+   *    used.
+   */
+  virtual RecordReader* createRecordReader(BSPContext& context) const {
+    return NULL; 
+  }
+
+  /**
+   * Create an application record writer.
+   * @return the new RecordWriter or NULL, if the Java RecordWriter should be
+   *    used.
+   */
+  virtual RecordWriter* createRecordWriter(BSPContext& context) const {
+    return NULL;
+  }
+
+  virtual ~Factory() {}
+};
+
+/**
+ * Run the assigned task in the framework.
+ * The user's main function should set the various functions using the 
+ * set* functions above and then call this.
+ * @return true, if the task succeeded.
+ */
+bool runTask(const Factory& factory);
+
+}
+
+#endif

Added: hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/api/hama/TemplateFactory.hh?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh (added)
+++ hama/trunk/c++/src/main/native/pipes/api/hama/TemplateFactory.hh Fri Aug 30 13:26:00 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HAMA_PIPES_TEMPLATE_FACTORY_HH
+#define HAMA_PIPES_TEMPLATE_FACTORY_HH
+
+namespace HamaPipes {
+
+  template <class BSP>
+  class TemplateFactory2: public Factory {
+  public:
+    BSP* createBSP(BSPContext& context) const {
+      return new BSP(context);
+    }
+  };
+
+  template <class BSP, class partitioner=void>
+  class TemplateFactory: public TemplateFactory2<BSP> {
+  public:
+      partitioner* createPartitioner(BSPContext& context) const {
+          return new partitioner(context);
+      }
+  };
+  template <class BSP>
+  class TemplateFactory<BSP,void>
+      : public TemplateFactory2<BSP> {
+  };
+    
+}
+
+#endif

Added: hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/HamaPipes.cc?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc (added)
+++ hama/trunk/c++/src/main/native/pipes/impl/HamaPipes.cc Fri Aug 30 13:26:00 2013
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "hama/Pipes.hh"
+#include "hadoop/SerialUtils.hh"
+#include "hadoop/StringUtils.hh"
+
+#include <map>
+#include <vector>
+
+#include <errno.h>
+#include <netinet/in.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/socket.h>
+#include <pthread.h>
+#include <iostream>
+#include <fstream>
+
+#include <openssl/hmac.h>
+#include <openssl/buffer.h>
+
+#define stringify( name ) # name
+
+using std::map;
+using std::string;
+using std::vector;
+using std::cout;
+using std::endl;
+
+using namespace HadoopUtils;
+
+namespace HamaPipes {
+
+  bool logging;
+  
+  /********************************************/
+  /****************** BSPJob ******************/  
+  /********************************************/
+  class BSPJobImpl: public BSPJob {
+  private:
+    map<string, string> values;
+  public:
+    void set(const string& key, const string& value) {
+      values[key] = value;
+    }
+
+    virtual bool hasKey(const string& key) const {
+      return values.find(key) != values.end();
+    }
+
+    virtual const string& get(const string& key) const {
+      map<string,string>::const_iterator itr = values.find(key);
+      if (itr == values.end()) {
+        throw Error("Key " + key + " not found in BSPJob");
+      }        
+      return itr->second;
+    }
+
+    virtual int getInt(const string& key) const {
+      const string& val = get(key);
+      return toInt(val);
+    }
+
+    virtual float getFloat(const string& key) const {
+      const string& val = get(key);
+      return toFloat(val);
+    }
+
+    virtual bool getBoolean(const string&key) const {
+      const string& val = get(key);
+      return toBool(val);
+    }
+  };
+    
+  /********************************************/
+  /************* DownwardProtocol *************/  
+  /********************************************/
+  class DownwardProtocol {
+  public:
+    virtual void start(int protocol) = 0;
+    virtual void setBSPJob(vector<string> values) = 0;
+    virtual void setInputTypes(string keyType, string valueType) = 0;
+    virtual void setKeyValue(const string& _key, const string& _value) = 0;
+      
+    virtual void runBsp(bool pipedInput, bool pipedOutput) = 0;
+    virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0;
+    virtual void runSetup(bool pipedInput, bool pipedOutput) = 0;
+    virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0;  
+      
+    virtual void setNewResult(int32_t value) = 0;
+    virtual void setNewResult(int64_t value) = 0;  
+    virtual void setNewResult(const string&  value) = 0;
+    virtual void setNewResult(vector<string> value) = 0;
+      
+    //virtual void reduceKey(const string& key) = 0;
+    //virtual void reduceValue(const string& value) = 0;
+    virtual void close() = 0;
+    virtual void abort() = 0;
+    virtual ~DownwardProtocol() {}
+  };
+
+  /********************************************/
+  /************** UpwardProtocol **************/  
+  /********************************************/
+  class UpwardProtocol {
+  public:
+    virtual void sendCMD(int32_t cmd) = 0;
+    virtual void sendCMD(int32_t cmd, int32_t value) = 0;
+    virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0;
+    virtual void sendCMD(int32_t cmd, const string& value) = 0;
+    virtual void sendCMD(int32_t cmd, const string values[], int size) = 0;
+      
+    //virtual void registerCounter(int id, const string& group, const string& name) = 0;
+    //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
+    virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0;
+    virtual ~UpwardProtocol() {}
+  };
+    
+  /********************************************/
+  /***************** Protocol *****************/  
+  /********************************************/
+  class Protocol {
+  public:
+    virtual void nextEvent() = 0;
+    virtual UpwardProtocol* getUplink() = 0;
+    virtual ~Protocol(){}
+  };
+    
+  /********************************************/
+  /*************** MESSAGE_TYPE ***************/  
+  /********************************************/
+  enum MESSAGE_TYPE {
+      START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES,       
+      RUN_SETUP, RUN_BSP, RUN_CLEANUP,
+      READ_KEYVALUE, WRITE_KEYVALUE, 
+      GET_MSG, GET_MSG_COUNT, 
+      SEND_MSG, SYNC, 
+      GET_ALL_PEERNAME, GET_PEERNAME,
+      GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT,
+      REOPEN_INPUT, CLEAR,
+      CLOSE, ABORT,
+      DONE, TASK_DONE, 
+      REGISTER_COUNTER, INCREMENT_COUNTER,
+      SEQFILE_OPEN, SEQFILE_READNEXT, 
+      SEQFILE_APPEND, SEQFILE_CLOSE,
+      PARTITION_REQUEST, PARTITION_RESPONSE
+  };
+    
+  /* Only needed for debugging output */
+  const char* messageTypeNames[] = {
+      stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ),       
+      stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ),
+      stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ), 
+      stringify( GET_MSG ), stringify( GET_MSG_COUNT ), 
+      stringify( SEND_MSG ), stringify( SYNC ), 
+      stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ),
+      stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ),
+      stringify( REOPEN_INPUT ), stringify( CLEAR ),
+      stringify( CLOSE ), stringify( ABORT ),
+      stringify( DONE ), stringify( TASK_DONE ), 
+      stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ),
+      stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ),
+      stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ),
+      stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE )
+    };
+
+  /********************************************/
+  /*********** BinaryUpwardProtocol ***********/  
+  /********************************************/
+  class BinaryUpwardProtocol: public UpwardProtocol {
+  private:
+    FileOutStream* stream;
+  public:
+    BinaryUpwardProtocol(FILE* _stream) {
+      stream = new FileOutStream();
+      HADOOP_ASSERT(stream->open(_stream), "problem opening stream");
+        
+    }
+
+    virtual void sendCMD(int32_t cmd) {
+      serializeInt(cmd, *stream);
+      stream->flush();
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n",
+          messageTypeNames[cmd]);
+    }
+      
+    virtual void sendCMD(int32_t cmd, int32_t value) {
+      serializeInt(cmd, *stream);
+      serializeInt(value, *stream);
+      stream->flush();
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value);
+    }
+      
+    virtual void sendCMD(int32_t cmd, const string& value) {
+      serializeInt(cmd, *stream);
+      serializeString(value, *stream);
+      stream->flush();
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str());
+    }
+      
+    virtual void sendCMD(int32_t cmd, const string values[], int size) {
+      serializeInt(cmd, *stream);      
+      for (int i=0; i<size; i++) { 
+        serializeString(values[i], *stream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+      }
+      stream->flush();
+    }
+      
+    virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) {
+      serializeInt(cmd, *stream);
+      serializeInt(value, *stream);
+      for (int i=0; i<size; i++) { 
+        serializeString(values[i], *stream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Param%d: %s\n",messageTypeNames[cmd],i+1,values[i].c_str());
+      } 
+      stream->flush();
+    }
+
+    /*
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) {
+      serializeInt(REGISTER_COUNTER, *stream);
+      serializeInt(id, *stream);
+      serializeString(group, *stream);
+      serializeString(name, *stream);
+    }
+    
+    virtual void incrementCounter(const TaskContext::Counter* counter, 
+                                  uint64_t amount) {
+      serializeInt(INCREMENT_COUNTER, *stream);
+      serializeInt(counter->getId(), *stream);
+      serializeLong(amount, *stream);
+    }
+    */
+    
+    virtual void incrementCounter(const string& group, const string& name, uint64_t amount) {
+      serializeInt(INCREMENT_COUNTER, *stream);
+      serializeString(group, *stream);
+      serializeString(name, *stream);
+      serializeLong(amount, *stream);
+      stream->flush();
+      if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n");
+    }
+      
+    ~BinaryUpwardProtocol() {
+      delete stream;
+    }
+  };
+
+  /********************************************/
+  /************** BinaryProtocol **************/  
+  /********************************************/
+  class BinaryProtocol: public Protocol {
+  private:
+    FileInStream* downStream;
+    DownwardProtocol* handler;
+    BinaryUpwardProtocol * uplink;
+      
+    string key;
+    string value;
+   
+  public:
+    BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) {
+      downStream = new FileInStream();
+      downStream->open(down);
+      uplink = new BinaryUpwardProtocol(up);
+      handler = _handler;
+      
+      //authDone = false;
+      //getPassword(password);
+    }
+
+    UpwardProtocol* getUplink() {
+      return uplink;
+    }
+
+      
+    virtual void nextEvent() {
+      int32_t cmd;
+      cmd = deserializeInt(*downStream);
+        
+     switch (cmd) {
+            
+      case START_MESSAGE: {
+        int32_t prot;
+        prot = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot); 
+        handler->start(prot);
+        break;
+      }
+      /* SET BSP Job Configuration / Environment */
+      case SET_BSPJOB_CONF: {
+        int32_t entries;
+        entries = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries); 
+        vector<string> result(entries*2);
+        for(int i=0; i < entries*2; ++i) {
+          string item;
+          deserializeString(item, *downStream);
+          result.push_back(item);
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str()); 
+        }
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries);
+        handler->setBSPJob(result);
+        break;
+      }
+      case SET_INPUT_TYPES: {
+        string keyType;
+        string valueType;
+        deserializeString(keyType, *downStream);
+        deserializeString(valueType, *downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n",
+                keyType.c_str(),valueType.c_str()); 
+        handler->setInputTypes(keyType, valueType);
+        break;
+      }
+      case READ_KEYVALUE: {
+        deserializeString(key, *downStream);
+        deserializeString(value, *downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n",
+                key.c_str(),
+                ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); 
+        handler->setKeyValue(key, value);
+        break;
+      }
+      case RUN_SETUP: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); 
+        int32_t pipedInput;
+        int32_t pipedOutput;
+        pipedInput = deserializeInt(*downStream);
+        pipedOutput = deserializeInt(*downStream);
+        handler->runSetup(pipedInput, pipedOutput);
+        break;
+      }
+      case RUN_BSP: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); 
+        int32_t pipedInput;
+        int32_t pipedOutput;
+        pipedInput = deserializeInt(*downStream);
+        pipedOutput = deserializeInt(*downStream);
+        handler->runBsp(pipedInput, pipedOutput);
+        break;
+      }
+      case RUN_CLEANUP: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); 
+        int32_t pipedInput;
+        int32_t pipedOutput;
+        pipedInput = deserializeInt(*downStream);
+        pipedOutput = deserializeInt(*downStream);
+        handler->runCleanup(pipedInput, pipedOutput);
+        break;
+      }
+      
+      case PARTITION_REQUEST: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); 
+        string partionKey;
+        string partionValue;
+        int32_t numTasks;
+        deserializeString(partionKey, *downStream);
+        deserializeString(partionValue, *downStream);
+        numTasks = deserializeInt(*downStream);
+        handler->runPartition(partionKey, partionValue, numTasks);
+        break;
+      }
+
+        
+      case GET_MSG_COUNT: {
+        int32_t msgCount = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount); 
+        handler->setNewResult(msgCount);
+        break;
+      }
+      case GET_MSG: {
+        string msg;
+        deserializeString(msg,*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str());
+        handler->setNewResult(msg);
+        break;
+      }
+      case GET_PEERNAME: {
+        string peername;
+        deserializeString(peername,*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str());
+        handler->setNewResult(peername);
+        break;
+      }
+      case GET_ALL_PEERNAME: {
+        vector<string> peernames;
+        int32_t peernameCount = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount);
+        string peername;
+        for (int i=0; i<peernameCount; i++)  {
+          deserializeString(peername,*downStream);
+          peernames.push_back(peername);
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peername: %s\n",peername.c_str());
+        }
+        handler->setNewResult(peernames);
+        break;
+      }
+      case GET_PEER_INDEX: {
+        int32_t peerIndex = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex); 
+        handler->setNewResult(peerIndex);
+        break;
+      }
+      case GET_PEER_COUNT: {
+        int32_t peerCount = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount); 
+        handler->setNewResult(peerCount);
+        break;
+      }
+      case GET_SUPERSTEP_COUNT: {
+        int64_t superstepCount = deserializeLong(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount); 
+        handler->setNewResult(superstepCount);
+        break;
+      }
+             
+             
+      case SEQFILE_OPEN: {
+        int32_t fileID = deserializeInt(*downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID); 
+        handler->setNewResult(fileID);
+        break;
+      }    
+      case SEQFILE_READNEXT: {
+        deserializeString(key, *downStream);
+        deserializeString(value, *downStream);
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n", 
+                key.c_str(),
+                ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); 
+        handler->setKeyValue(key, value);
+        break;
+      }
+      case SEQFILE_APPEND: {
+          int32_t result = deserializeInt(*downStream);
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result);
+          handler->setNewResult(result);
+          break;
+      }   
+      case SEQFILE_CLOSE: {
+          int32_t result = deserializeInt(*downStream);
+          if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result);
+          handler->setNewResult(result);
+          break;
+      }
+             
+        
+      case CLOSE: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); 
+        handler->close();
+        break;
+      }
+      case ABORT: {
+        if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); 
+        handler->abort();
+        break;
+      }        
+      default:
+        HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd));
+        fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd); 
+      }
+     }
+      
+    virtual ~BinaryProtocol() {
+      delete downStream;
+      delete uplink;
+    }
+  };
+
+  /********************************************/
+  /************** BSPContextImpl **************/  
+  /********************************************/
+  class BSPContextImpl: public BSPContext, public DownwardProtocol {
+  private:
+    bool done;
+    BSPJob* job;
+    //string key;
+    //const string* newKey;
+    //const string* value;
+    bool hasTask;
+    //bool isNewKey;
+    //bool isNewValue;
+    string* inputKeyClass;
+    string* inputValueClass;
+    
+    //string status;
+    //float progressFloat;
+    //uint64_t lastProgress;
+    //bool statusSet;
+      
+    Protocol* protocol;
+    UpwardProtocol *uplink;
+    
+    //string* inputSplit;
+    
+    RecordReader* reader;
+    RecordWriter* writer;
+      
+    BSP* bsp;
+    Partitioner* partitioner;
+    
+    const Factory* factory;
+    pthread_mutex_t mutexDone;
+    std::vector<int> registeredCounterIds;
+      
+    int32_t resultInt;
+    bool isNewResultInt;  
+    int64_t resultLong;
+    bool isNewResultLong; 
+    string resultString;
+    bool isNewResultString;   
+    vector<string> resultVector;
+    bool isNewResultVector; 
+    
+    bool isNewKeyValuePair;  
+    string currentKey;
+    string currentValue;
+
+  public:
+
+    BSPContextImpl(const Factory& _factory) {
+      //statusSet = false;
+      done = false;
+      //newKey = NULL;
+      factory = &_factory;
+      job = NULL;
+        
+      inputKeyClass = NULL;
+      inputValueClass = NULL;
+      
+      //inputSplit = NULL;
+      
+      bsp = NULL;
+      reader = NULL;
+      writer = NULL;
+      partitioner = NULL;
+      protocol = NULL;
+      //isNewKey = false;
+      //isNewValue = false;
+      //lastProgress = 0;
+      //progressFloat = 0.0f;
+      hasTask = false;
+      pthread_mutex_init(&mutexDone, NULL);
+        
+      isNewResultInt = false;
+      isNewResultString = false,
+      isNewResultVector = false;
+        
+      isNewKeyValuePair = false;
+    }
+
+  
+    /********************************************/
+    /*********** DownwardProtocol IMPL **********/  
+    /********************************************/
+    virtual void start(int protocol) {
+      if (protocol != 0) {
+        throw Error("Protocol version " + toString(protocol) + 
+                    " not supported");
+      }
+      partitioner = factory->createPartitioner(*this);
+    }
+
+    virtual void setBSPJob(vector<string> values) {
+      int len = values.size();
+      BSPJobImpl* result = new BSPJobImpl();
+      HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values");
+      for(int i=0; i < len; i += 2) {
+        result->set(values[i], values[i+1]);
+      }
+      job = result;
+    }
+
+    virtual void setInputTypes(string keyType, string valueType) {
+      inputKeyClass = new string(keyType);
+      inputValueClass = new string(valueType);
+    }
+      
+    virtual void setKeyValue(const string& _key, const string& _value) {
+      currentKey = _key;
+      currentValue = _value;
+      isNewKeyValuePair = true;
+    }
+     
+    /* private Method */
+    void setupReaderWriter(bool pipedInput, bool pipedOutput) {
+        
+      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n",
+              (pipedInput)?"true":"false",(pipedOutput)?"true":"false");
+
+      if (pipedInput && reader==NULL) {
+        reader = factory->createRecordReader(*this);
+        HADOOP_ASSERT((reader == NULL) == pipedInput,
+                      pipedInput ? "RecordReader defined when not needed.":
+                      "RecordReader not defined");
+        
+        //if (reader != NULL) {
+        //    value = new string();
+        //}
+      }  
+        
+      if (pipedOutput && writer==NULL) {
+        writer = factory->createRecordWriter(*this);
+        HADOOP_ASSERT((writer == NULL) == pipedOutput,
+                      pipedOutput ? "RecordWriter defined when not needed.":
+                      "RecordWriter not defined");
+      }
+    }
+      
+    virtual void runSetup(bool pipedInput, bool pipedOutput) {
+      setupReaderWriter(pipedInput,pipedOutput);
+      
+      if (bsp == NULL)  
+        bsp = factory->createBSP(*this);
+        
+      if (bsp != NULL) {
+        hasTask = true;
+        bsp->setup(*this);
+        hasTask = false;
+        uplink->sendCMD(TASK_DONE);
+      }
+    }
+      
+    virtual void runBsp(bool pipedInput, bool pipedOutput) {
+      setupReaderWriter(pipedInput,pipedOutput);
+
+      if (bsp == NULL)  
+          bsp = factory->createBSP(*this);
+
+      if (bsp != NULL) {
+        hasTask = true;
+        bsp->bsp(*this);
+        hasTask = false;
+        uplink->sendCMD(TASK_DONE);
+      }
+    }
+      
+    virtual void runCleanup(bool pipedInput, bool pipedOutput) {
+      setupReaderWriter(pipedInput,pipedOutput);
+        
+      if (bsp != NULL) {
+        hasTask = true;
+        bsp->cleanup(*this);
+        hasTask = false;
+        uplink->sendCMD(TASK_DONE);
+      }
+    }
+      
+    /********************************************/
+    /*******       Partitioner            *******/  
+    /********************************************/ 
+    virtual void runPartition(const string& key, const string& value, int32_t numTasks){
+      if (partitioner != NULL) {             
+        int part = partitioner->partition(key, value, numTasks);
+        uplink->sendCMD(PARTITION_RESPONSE, part);
+      } else {
+        if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n");
+      }
+    } 
+                          
+    virtual void setNewResult(int32_t _value) {
+      resultInt = _value;
+      isNewResultInt = true;  
+    }
+
+    virtual void setNewResult(int64_t _value) {
+      resultLong = _value;
+      isNewResultLong = true;  
+    }
+      
+    virtual void setNewResult(const string& _value) {
+      resultString = _value;
+      isNewResultString = true;   
+    }
+
+    virtual void setNewResult(vector<string> _value) {
+      resultVector = _value;
+      isNewResultVector = true;    
+    }
+
+    virtual void close() {
+      pthread_mutex_lock(&mutexDone);
+      done = true;
+      hasTask = false;
+      pthread_mutex_unlock(&mutexDone);
+      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n",
+                (done)?"true":"false",(hasTask)?"true":"false");
+    }
+      
+    virtual void abort() {
+      throw Error("Aborted by driver");
+    }
+
+    /********************************************/
+    /************** TaskContext IMPL ************/  
+    /********************************************/
+    
+    /**
+     * Get the BSPJob for the current task.
+     */
+    virtual const BSPJob* getBSPJob() {
+      return job;
+    }
+
+    /**
+     * Get the current key. 
+     * @return the current key or NULL if called before the first map or reduce
+     */
+    //virtual const string& getInputKey() {
+    //  return key;
+    //}
+
+    /**
+     * Get the current value. 
+     * @return the current value or NULL if called before the first map or 
+     *    reduce
+     */
+    //virtual const string& getInputValue() {
+    //  return *value;
+    //}
+      
+    /**
+     * Register a counter with the given group and name.
+     */
+    /*
+    virtual Counter* getCounter(const std::string& group, 
+                                  const std::string& name) {
+        int id = registeredCounterIds.size();
+        registeredCounterIds.push_back(id);
+        uplink->registerCounter(id, group, name);
+        return new Counter(id);
+    }*/
+      
+    /**
+     * Increment the value of the counter with the given amount.
+     */
+    virtual void incrementCounter(const string& group, const string& name, uint64_t amount)  {
+        uplink->incrementCounter(group, name, amount); 
+    }
+      
+    /********************************************/
+    /************** BSPContext IMPL *************/  
+    /********************************************/
+      
+    /**
+     * Access the InputSplit of the bsp.
+     */
+    //virtual const string& getInputSplit() {
+    //  return *inputSplit;
+    //}
+      
+    /**
+     * Get the name of the key class of the input to this task.
+     */
+    virtual const string& getInputKeyClass() {
+      return *inputKeyClass;
+    }
+
+    /**
+     * Get the name of the value class of the input to this task.
+     */
+    virtual const string& getInputValueClass() {
+      return *inputValueClass;
+    }
+
+    /**
+     * Send a data with a tag to another BSPSlave corresponding to hostname.
+     * Messages sent by this method are not guaranteed to be received in a sent
+     * order.
+     */
+    virtual void sendMessage(const string& peerName, const string& msg) {
+        string values[] = {peerName, msg};
+        uplink->sendCMD(SEND_MSG,values, 2);
+    }
+      
+    /**
+     * @return A message from the peer's received messages queue (a FIFO).
+     */
+    virtual const string& getCurrentMessage() {
+      uplink->sendCMD(GET_MSG);
+      
+      while (!isNewResultString)
+          protocol->nextEvent();
+        
+      isNewResultString = false;
+      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str());
+      return resultString;
+    }
+
+    /**
+     * @return The number of messages in the peer's received messages queue.
+     */
+    virtual int getNumCurrentMessages() {
+      uplink->sendCMD(GET_MSG_COUNT);
+        
+      while (!isNewResultInt)
+        protocol->nextEvent();
+      
+      isNewResultInt = false;
+      return resultInt;
+    }
+      
+    /**
+     * Barrier Synchronization.
+     * 
+     * Sends all the messages in the outgoing message queues to the corresponding
+     * remote peers.
+     */
+    virtual void sync() {
+      uplink->sendCMD(SYNC);
+    }
+    
+    /**
+     * @return the name of this peer in the format "hostname:port".
+     */ 
+    virtual const string& getPeerName() {
+      uplink->sendCMD(GET_PEERNAME,-1);
+    
+      while (!isNewResultString)
+        protocol->nextEvent();
+    
+      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+      isNewResultString = false;
+      return resultString;
+    }
+    
+    /**
+     * @return the name of n-th peer from sorted array by name.
+     */
+    virtual const string& getPeerName(int index) {
+      uplink->sendCMD(GET_PEERNAME,index);
+        
+      while (!isNewResultString)
+        protocol->nextEvent();
+  
+      if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str());
+      isNewResultString = false;
+      return resultString;
+    }
+    
+    /**
+     * @return the names of all the peers executing tasks from the same job
+     *         (including this peer).
+     */
+    virtual vector<string> getAllPeerNames() {
+      uplink->sendCMD(GET_ALL_PEERNAME);
+        
+      while (!isNewResultVector)
+        protocol->nextEvent();
+        
+      isNewResultVector = false;
+      return resultVector;
+    }
+    
+    /**
+     * @return the index of this peer from sorted array by name.
+     */
+    virtual int getPeerIndex() {
+      uplink->sendCMD(GET_PEER_INDEX);
+        
+      while (!isNewResultInt)
+        protocol->nextEvent();
+        
+      isNewResultInt = false;
+      return resultInt;
+    }
+      
+    /**
+     * @return the number of peers
+     */
+    virtual int getNumPeers() {
+      uplink->sendCMD(GET_PEER_COUNT);
+        
+      while (!isNewResultInt)
+        protocol->nextEvent();
+        
+      isNewResultInt = false;
+      return resultInt;       
+    }
+      
+    /**
+     * @return the count of current super-step
+     */
+    virtual long getSuperstepCount() {
+      uplink->sendCMD(GET_SUPERSTEP_COUNT);
+        
+      while (!isNewResultLong)
+        protocol->nextEvent();
+        
+      isNewResultLong = false;
+      return resultLong;     
+    }  
+    
+    /**
+     * Clears all queues entries.
+     */
+    virtual void clear() {
+      uplink->sendCMD(CLEAR);
+    }
+
+    /**
+     * Writes a key/value pair to the output collector
+     */
+    virtual void write(const string& key, const string& value) {
+        if (writer != NULL) {
+            writer->emit(key, value);
+        } else {
+            string values[] = {key, value};
+            uplink->sendCMD(WRITE_KEYVALUE, values, 2);
+        }
+    }
+    
+    /**
+     * Deserializes the next input key value into the given objects;
+     */
+    virtual bool readNext(string& _key, string& _value) {
+      uplink->sendCMD(READ_KEYVALUE);
+        
+      while (!isNewKeyValuePair)
+        protocol->nextEvent();
+      
+      isNewKeyValuePair = false;
+        
+      _key = currentKey;
+      _value = currentValue;
+      
+      if (logging && _key.empty() && _value.empty())  
+        fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
+        
+      return (!_key.empty() && !_value.empty());
+    }
+       
+    /**
+     * Closes the input and opens it right away, so that the file pointer is at
+     * the beginning again.
+     */
+    virtual void reopenInput() {
+      uplink->sendCMD(REOPEN_INPUT);
+    }
+      
+      
+    /********************************************/
+    /*******  SequenceFileConnector IMPL  *******/  
+    /********************************************/     
+      
+    /**
+     * Open SequenceFile with opion "r" or "w"
+     * @return the corresponding fileID
+     */
+    virtual int sequenceFileOpen(const string& path, const string& option, 
+                                 const string& keyType, const string& valueType) {
+      if (logging)  
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str());
+     
+      if ( (option.compare("r")==0) || (option.compare("w")==0))  {
+          
+          string values[] = {path, option, keyType, valueType};
+          uplink->sendCMD(SEQFILE_OPEN,values, 4);
+      
+          while (!isNewResultInt)
+            protocol->nextEvent();
+        
+          isNewResultInt = false;
+          return resultInt;
+      } else { 
+          fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str());
+          return -1; //Error wrong option
+      }
+    }
+
+    /**
+     * Read next key/value pair from the SequenceFile with fileID
+     */
+    virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) {
+        
+      uplink->sendCMD(SEQFILE_READNEXT,fileID);
+        
+      while (!isNewKeyValuePair)
+        protocol->nextEvent();
+        
+      isNewKeyValuePair = false;
+        
+      _key = currentKey;
+      _value = currentValue;
+        
+      if (logging && _key.empty() && _value.empty())  
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n");
+        
+      return (!_key.empty() && !_value.empty());
+    }
+
+    /**
+     * Append the next key/value pair to the SequenceFile with fileID
+     */
+    virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) {
+      string values[] = {key, value};
+      uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2);
+                
+      while (!isNewResultInt)
+        protocol->nextEvent();
+        
+      isNewResultInt = false;
+      return (resultInt==1);
+    }
+
+    /**
+     * Close SequenceFile
+     */
+    virtual bool sequenceFileClose(int fileID) {
+      uplink->sendCMD(SEQFILE_CLOSE,fileID);
+        
+      while (!isNewResultInt)
+        protocol->nextEvent();
+        
+      if (logging && resultInt==0)  
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n");
+      else if (logging)
+        fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n");
+    
+      isNewResultInt = false;
+      return (resultInt==1);
+    }
+      
+    /********************************************/
+    /*************** Other STUFF  ***************/  
+    /********************************************/
+      
+    void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
+        protocol = _protocol;
+        uplink = _uplink;
+    }
+   
+    bool isDone() {
+        pthread_mutex_lock(&mutexDone);
+        bool doneCopy = done;
+        pthread_mutex_unlock(&mutexDone);
+        return doneCopy;
+    }
+      
+    /**
+     * Advance to the next value.
+     */
+    /*
+    bool nextValue() {
+        if (isNewKey || done) {
+            return false;
+        }
+        isNewValue = false;
+        //progress();
+        protocol->nextEvent();
+        return isNewValue;
+    } 
+    */
+    void waitForTask() {
+        while (!done && !hasTask) {		
+            if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n",
+                    (done)?"true":"false",(hasTask)?"true":"false");
+            protocol->nextEvent();
+        }
+    }
+    /*
+    bool nextKey() {
+        if (reader == NULL) {
+            while (!isNewKey) {
+                nextValue();
+                if (done) {
+                    return false;
+                }
+            }
+            key = *newKey;
+        } else {
+            if (!reader->next(key, const_cast<string&>(*value))) {
+                pthread_mutex_lock(&mutexDone);
+                done = true;
+                pthread_mutex_unlock(&mutexDone);
+                return false;
+            }
+            //progressFloat = reader->getProgress();
+        }
+        isNewKey = false;
+          
+        if (bsp != NULL) {
+            bsp->bsp(*this);
+        }
+        return true;
+    }
+   */
+    void closeAll() {
+      if (reader) {
+        reader->close();
+      }
+      
+      if (bsp) {
+        bsp->close();
+      }
+     
+      if (writer) {
+        writer->close();
+      }
+    }
+      
+    virtual ~BSPContextImpl() {
+      delete job;
+      delete inputKeyClass;
+      delete inputValueClass;
+      //delete inputSplit;
+      //if (reader) {
+      //  delete value;
+      //}
+      delete reader;
+      delete bsp;
+      delete writer;
+      pthread_mutex_destroy(&mutexDone);
+    }
+  };
+
+  /**
+   * Ping the parent every 5 seconds to know if it is alive 
+   */
+  void* ping(void* ptr) {
+    BSPContextImpl* context = (BSPContextImpl*) ptr;
+    char* portStr = getenv("hama.pipes.command.port");
+    int MAX_RETRIES = 3;
+    int remaining_retries = MAX_RETRIES;
+    while (!context->isDone()) {
+      try{
+        sleep(5);
+        int sock = -1;
+        if (portStr) {
+          sock = socket(PF_INET, SOCK_STREAM, 0);
+          HADOOP_ASSERT(sock != - 1,
+                        string("problem creating socket: ") + strerror(errno));
+          sockaddr_in addr;
+          addr.sin_family = AF_INET;
+          addr.sin_port = htons(toInt(portStr));
+          addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+          if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr);   
+          HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+                        string("problem connecting command socket: ") +
+                        strerror(errno));
+
+        }
+        if (sock != -1) {
+          int result = shutdown(sock, SHUT_RDWR);
+          HADOOP_ASSERT(result == 0, "problem shutting socket");
+          result = close(sock);
+          HADOOP_ASSERT(result == 0, "problem closing socket");
+        }
+        remaining_retries = MAX_RETRIES;
+      } catch (Error& err) {
+        if (!context->isDone()) {
+          fprintf(stderr, "Hama Pipes Exception: in ping %s\n", 
+                err.getMessage().c_str());
+          remaining_retries -= 1;
+          if (remaining_retries == 0) {
+            exit(1);
+          }
+        } else {
+          return NULL;
+        }
+      }
+    }
+    return NULL;
+  }
+    
+  /**
+   * Run the assigned task in the framework.
+   * The user's main function should set the various functions using the 
+   * set* functions above and then call this.
+   * @return true, if the task succeeded.
+   */
+  bool runTask(const Factory& factory) {
+    try {
+      HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!");
+        
+      logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true;  
+      if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false"); 
+        
+      BSPContextImpl* context = new BSPContextImpl(factory);
+      Protocol* connection;
+        
+      char* portStr = getenv("hama.pipes.command.port");
+      int sock = -1;
+      FILE* stream = NULL;
+      FILE* outStream = NULL;
+      char *bufin = NULL;
+      char *bufout = NULL;
+      if (portStr) {
+        sock = socket(PF_INET, SOCK_STREAM, 0);
+        HADOOP_ASSERT(sock != - 1,
+                      string("problem creating socket: ") + strerror(errno));
+        sockaddr_in addr;
+        addr.sin_family = AF_INET;
+        addr.sin_port = htons(toInt(portStr));
+        addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+                      string("problem connecting command socket: ") +
+                      strerror(errno));
+
+        stream = fdopen(sock, "r");
+        outStream = fdopen(sock, "w");
+
+        // increase buffer size
+        int bufsize = 128*1024;
+        int setbuf;
+        bufin = new char[bufsize];
+        bufout = new char[bufsize];
+        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+                                     + strerror(errno));
+        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+                                     + strerror(errno));
+          
+        connection = new BinaryProtocol(stream, context, outStream);
+        if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr);  
+          
+      } else if (getenv("hama.pipes.command.file")) {
+        char* filename = getenv("hama.pipes.command.file");
+        string outFilename = filename;
+        outFilename += ".out";
+        stream = fopen(filename, "r");
+        outStream = fopen(outFilename.c_str(), "w");
+        connection = new BinaryProtocol(stream, context, outStream);
+      } else {
+        //connection = new TextProtocol(stdin, context, stdout);
+        fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n");
+        return -1;
+      }
+ 
+      context->setProtocol(connection, connection->getUplink());
+        
+      //pthread_t pingThread;
+      //pthread_create(&pingThread, NULL, ping, (void*)(context));
+      
+      context->waitForTask();
+        
+      //while (!context->isDone()) {
+        //context->nextKey();
+      //}
+        
+      context->closeAll();
+      connection->getUplink()->sendCMD(DONE);
+      
+      //pthread_join(pingThread,NULL);
+      
+      delete context;
+      delete connection;
+      if (stream != NULL) {
+        fflush(stream);
+      }
+      if (outStream != NULL) {
+        fflush(outStream);
+      }
+      fflush(stdout);
+      if (sock != -1) {
+        int result = shutdown(sock, SHUT_RDWR);
+        HADOOP_ASSERT(result == 0, "problem shutting socket");
+        result = close(sock);
+        HADOOP_ASSERT(result == 0, "problem closing socket");
+      }
+      if (stream != NULL) {
+        //fclose(stream);
+      }
+      if (outStream != NULL) {
+        //fclose(outStream);
+      } 
+      delete bufin;
+      delete bufout;
+      return true;
+    } catch (Error& err) {
+      fprintf(stderr, "Hama Pipes Exception: %s\n", 
+              err.getMessage().c_str());
+      return false;
+    }
+  }
+}
+

Added: hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/api/hadoop/SerialUtils.hh?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh (added)
+++ hama/trunk/c++/src/main/native/utils/api/hadoop/SerialUtils.hh Fri Aug 30 13:26:00 2013
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HADOOP_SERIAL_UTILS_HH
+#define HADOOP_SERIAL_UTILS_HH
+
+#include <string>
+#include <stdint.h>
+
+namespace HadoopUtils {
+
+  /**
+   * A simple exception class that records a message for the user.
+   */
+  class Error {
+  private:
+    std::string error;
+  public:
+
+    /**
+     * Create an error object with the given message.
+     */
+    Error(const std::string& msg);
+
+    /**
+     * Construct an error object with the given message that was created on
+     * the given file, line, and functino.
+     */
+    Error(const std::string& msg, 
+          const std::string& file, int line, const std::string& function);
+
+    /**
+     * Get the error message.
+     */
+    const std::string& getMessage() const;
+  };
+
+  /**
+   * Check to make sure that the condition is true, and throw an exception
+   * if it is not. The exception will contain the message and a description
+   * of the source location.
+   */
+  #define HADOOP_ASSERT(CONDITION, MESSAGE) \
+    { \
+      if (!(CONDITION)) { \
+        throw HadoopUtils::Error((MESSAGE), __FILE__, __LINE__, \
+                                    __PRETTY_FUNCTION__); \
+      } \
+    }
+
+  /**
+   * An interface for an input stream.
+   */
+  class InStream {
+  public:
+    /**
+     * Reads len bytes from the stream into the buffer.
+     * @param buf the buffer to read into
+     * @param buflen the length of the buffer
+     * @throws Error if there are problems reading
+     */
+    virtual void read(void *buf, size_t len) = 0;
+    virtual ~InStream() {}
+  };
+
+  /**
+   * An interface for an output stream.
+   */
+  class OutStream {
+  public:
+    /**
+     * Write the given buffer to the stream.
+     * @param buf the data to write
+     * @param len the number of bytes to write
+     * @throws Error if there are problems writing
+     */
+    virtual void write(const void *buf, size_t len) = 0;
+    /**
+     * Flush the data to the underlying store.
+     */
+    virtual void flush() = 0;
+    virtual ~OutStream() {}
+  };
+
+  /**
+   * A class to read a file as a stream.
+   */
+  class FileInStream : public InStream {
+  public:
+    FileInStream();
+    bool open(const std::string& name);
+    bool open(FILE* file);
+    void read(void *buf, size_t buflen);
+    bool skip(size_t nbytes);
+    bool close();
+    virtual ~FileInStream();
+  private:
+    /**
+     * The file to write to.
+     */
+    FILE *mFile;
+    /**
+     * Does is this class responsible for closing the FILE*?
+     */
+    bool isOwned;
+  };
+
+  /**
+   * A class to write a stream to a file.
+   */
+  class FileOutStream: public OutStream {
+  public:
+
+    /**
+     * Create a stream that isn't bound to anything.
+     */
+    FileOutStream();
+
+    /**
+     * Create the given file, potentially overwriting an existing file.
+     */
+    bool open(const std::string& name, bool overwrite);
+    bool open(FILE* file);
+    void write(const void* buf, size_t len);
+    bool advance(size_t nbytes);
+    void flush();
+    bool close();
+    virtual ~FileOutStream();
+  private:
+    FILE *mFile;
+    bool isOwned;
+  };
+
+  /**
+   * A stream that reads from a string.
+   */
+  class StringInStream: public InStream {
+  public:
+    StringInStream(const std::string& str);
+    virtual void read(void *buf, size_t buflen);
+  private:
+    const std::string& buffer;
+    std::string::const_iterator itr;
+  };
+
+  void serializeInt(int32_t t, OutStream& stream);
+  int32_t deserializeInt(InStream& stream);
+  void serializeLong(int64_t t, OutStream& stream);
+  int64_t deserializeLong(InStream& stream);
+  void serializeFloat(float t, OutStream& stream);
+  float deserializeFloat(InStream& stream);
+  void serializeString(const std::string& t, OutStream& stream);
+  void deserializeString(std::string& t, InStream& stream);
+}
+
+#endif

Added: hama/trunk/c++/src/main/native/utils/api/hadoop/Splitter.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/api/hadoop/Splitter.hh?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/api/hadoop/Splitter.hh (added)
+++ hama/trunk/c++/src/main/native/utils/api/hadoop/Splitter.hh Fri Aug 30 13:26:00 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+
+namespace HadoopUtils {
+
+    class Splitter {
+      private:
+        std::vector<std::string> _tokens;
+      public:
+        typedef std::vector<std::string>::size_type size_type;
+      public:
+        Splitter ( const std::string& src, const std::string& delim );
+        
+        std::string& operator[] ( size_type i );
+        
+        size_type size(); 
+        
+        void reset ( const std::string& src, const std::string& delim );
+    };
+}

Added: hama/trunk/c++/src/main/native/utils/api/hadoop/StringUtils.hh
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/api/hadoop/StringUtils.hh?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/api/hadoop/StringUtils.hh (added)
+++ hama/trunk/c++/src/main/native/utils/api/hadoop/StringUtils.hh Fri Aug 30 13:26:00 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef HADOOP_STRING_UTILS_HH
+#define HADOOP_STRING_UTILS_HH
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+namespace HadoopUtils {
+
+  /**
+   * Added by Apache Hama Pipes
+   * Convert an double to a string.
+   */
+  std::string toString(double d);
+    
+  /**
+   * Added by Apache Hama Pipes
+   * Convert the string to a double.
+   * @throws Error if the string is not a valid double
+   */
+  double toDouble(const std::string& val);
+
+  /**
+   * Convert an integer to a string.
+   */
+  std::string toString(int32_t x);
+
+  /**
+   * Convert a string to an integer.
+   * @throws Error if the string is not a valid integer
+   */
+  int32_t toInt(const std::string& val);
+
+  /**
+   * Convert the string to a float.
+   * @throws Error if the string is not a valid float
+   */
+  float toFloat(const std::string& val);
+
+  /**
+   * Convert the string to a boolean.
+   * @throws Error if the string is not a valid boolean value
+   */
+  bool toBool(const std::string& val);
+
+  /**
+   * Get the current time in the number of milliseconds since 1970.
+   */
+  uint64_t getCurrentMillis();
+
+  /**
+   * Split a string into "words". Multiple deliminators are treated as a single
+   * word break, so no zero-length words are returned.
+   * @param str the string to split
+   * @param separator a list of characters that divide words
+   */
+  std::vector<std::string> splitString(const std::string& str,
+                                       const char* separator);
+
+  /**
+   * Quote a string to avoid "\", non-printable characters, and the 
+   * deliminators.
+   * @param str the string to quote
+   * @param deliminators the set of characters to always quote
+   */
+  std::string quoteString(const std::string& str,
+                          const char* deliminators);
+
+  /**
+   * Unquote the given string to return the original string.
+   * @param str the string to unquote
+   */
+  std::string unquoteString(const std::string& str);
+
+}
+
+#endif

Added: hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/utils/impl/SerialUtils.cc?rev=1518963&view=auto
==============================================================================
--- hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc (added)
+++ hama/trunk/c++/src/main/native/utils/impl/SerialUtils.cc Fri Aug 30 13:26:00 2013
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "hadoop/SerialUtils.hh"
+#include "hadoop/StringUtils.hh"
+
+#include <errno.h>
+#include <rpc/types.h>
+#include <rpc/xdr.h>
+#include <string>
+#include <string.h>
+
+using std::string;
+
+namespace HadoopUtils {
+
+  Error::Error(const std::string& msg): error(msg) {
+  }
+
+  Error::Error(const std::string& msg, 
+               const std::string& file, int line, 
+               const std::string& function) {
+    error = msg + " at " + file + ":" + toString(line) + 
+            " in " + function;
+  }
+
+  const std::string& Error::getMessage() const {
+    return error;
+  }
+
+  FileInStream::FileInStream()
+  {
+    mFile = NULL;
+    isOwned = false;
+  }
+
+  bool FileInStream::open(const std::string& name)
+  {
+    mFile = fopen(name.c_str(), "rb");
+    isOwned = true;
+    return (mFile != NULL);
+  }
+
+  bool FileInStream::open(FILE* file)
+  {
+    mFile = file;
+    isOwned = false;
+    return (mFile != NULL);
+  }
+
+  void FileInStream::read(void *buf, size_t len)
+  {
+    size_t result = fread(buf, len, 1, mFile);
+    if (result == 0) {
+      if (feof(mFile)) {
+        HADOOP_ASSERT(false, "end of file");
+      } else {
+        HADOOP_ASSERT(false, string("read error on file: ") + strerror(errno));
+      }
+    }
+  }
+
+  bool FileInStream::skip(size_t nbytes)
+  {
+    return (0==fseek(mFile, nbytes, SEEK_CUR));
+  }
+
+  bool FileInStream::close()
+  {
+    int ret = 0;
+    if (mFile != NULL && isOwned) {
+      ret = fclose(mFile);
+    }
+    mFile = NULL;
+    return (ret==0);
+  }
+
+  FileInStream::~FileInStream()
+  {
+    if (mFile != NULL) {
+      close();
+    }
+  }
+
+  FileOutStream::FileOutStream()
+  {
+    mFile = NULL;
+    isOwned = false;
+  }
+
+  bool FileOutStream::open(const std::string& name, bool overwrite)
+  {
+    if (!overwrite) {
+      mFile = fopen(name.c_str(), "rb");
+      if (mFile != NULL) {
+        fclose(mFile);
+        return false;
+      }
+    }
+    mFile = fopen(name.c_str(), "wb");
+    isOwned = true;
+    return (mFile != NULL);
+  }
+
+  bool FileOutStream::open(FILE* file)
+  {
+    mFile = file;
+    isOwned = false;
+    return (mFile != NULL);
+  }
+
+  void FileOutStream::write(const void* buf, size_t len)
+  {
+    size_t result = fwrite(buf, len, 1, mFile);
+    HADOOP_ASSERT(result == 1,
+                  string("write error to file: ") + strerror(errno));
+  }
+
+  bool FileOutStream::advance(size_t nbytes)
+  {
+    return (0==fseek(mFile, nbytes, SEEK_CUR));
+  }
+
+  bool FileOutStream::close()
+  {
+    int ret = 0;
+    if (mFile != NULL && isOwned) {
+      ret = fclose(mFile);
+    }
+    mFile = NULL;
+    return (ret == 0);
+  }
+
+  void FileOutStream::flush()
+  {
+    fflush(mFile);
+  }
+
+  FileOutStream::~FileOutStream()
+  {
+    if (mFile != NULL) {
+      close();
+    }
+  }
+
+  StringInStream::StringInStream(const std::string& str): buffer(str) {
+    itr = buffer.begin();
+  }
+
+  void StringInStream::read(void *buf, size_t buflen) {
+    size_t bytes = 0;
+    char* output = (char*) buf;
+    std::string::const_iterator end = buffer.end();
+    while (bytes < buflen) {
+      output[bytes++] = *itr;
+      ++itr;
+      if (itr == end) {
+        break;
+      }
+    }
+    HADOOP_ASSERT(bytes == buflen, "unexpected end of string reached");
+  }
+
+  void serializeInt(int32_t t, OutStream& stream) {
+    serializeLong(t,stream);
+  }
+
+  void serializeLong(int64_t t, OutStream& stream)
+  {
+    if (t >= -112 && t <= 127) {
+      int8_t b = t;
+      stream.write(&b, 1);
+      return;
+    }
+        
+    int8_t len = -112;
+    if (t < 0) {
+      t ^= -1ll; // reset the sign bit
+      len = -120;
+    }
+        
+    uint64_t tmp = t;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+  
+    stream.write(&len, 1);      
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+        
+    for (uint32_t idx = len; idx != 0; idx--) {
+      uint32_t shiftbits = (idx - 1) * 8;
+      uint64_t mask = 0xFFll << shiftbits;
+      uint8_t b = (t & mask) >> shiftbits;
+      stream.write(&b, 1);
+    }
+  }
+
+  int32_t deserializeInt(InStream& stream) {
+    return deserializeLong(stream);
+  }
+
+  int64_t deserializeLong(InStream& stream)
+  {
+    int8_t b;
+    stream.read(&b, 1);
+    if (b >= -112) {
+      return b;
+    }
+    bool negative;
+    int len;
+    if (b < -120) {
+      negative = true;
+      len = -120 - b;
+    } else {
+      negative = false;
+      len = -112 - b;
+    }
+    uint8_t barr[len];
+    stream.read(barr, len);
+    int64_t t = 0;
+    for (int idx = 0; idx < len; idx++) {
+      t = t << 8;
+      t |= (barr[idx] & 0xFF);
+    }
+    if (negative) {
+      t ^= -1ll;
+    }
+    return t;
+  }
+
+  void serializeFloat(float t, OutStream& stream)
+  {
+    char buf[sizeof(float)];
+    XDR xdrs;
+    xdrmem_create(&xdrs, buf, sizeof(float), XDR_ENCODE);
+    xdr_float(&xdrs, &t);
+    stream.write(buf, sizeof(float));
+  }
+
+  void deserializeFloat(float& t, InStream& stream)
+  {
+    char buf[sizeof(float)];
+    stream.read(buf, sizeof(float));
+    XDR xdrs;
+    xdrmem_create(&xdrs, buf, sizeof(float), XDR_DECODE);
+    xdr_float(&xdrs, &t);
+  }
+
+  void serializeString(const std::string& t, OutStream& stream)
+  {
+    serializeInt(t.length(), stream);
+    if (t.length() > 0) {
+      stream.write(t.data(), t.length());
+    }
+  }
+
+  void deserializeString(std::string& t, InStream& stream)
+  {
+    int32_t len = deserializeInt(stream);
+    if (len > 0) {
+      // resize the string to the right length
+      t.resize(len);
+      // read into the string in 64k chunks
+      const int bufSize = 65536;
+      int offset = 0;
+      char buf[bufSize];
+      while (len > 0) {
+        int chunkLength = len > bufSize ? bufSize : len;
+        stream.read(buf, chunkLength);
+        t.replace(offset, chunkLength, buf, chunkLength);
+        offset += chunkLength;
+        len -= chunkLength;
+      }
+    } else {
+      t.clear();
+    }
+  }
+
+}



Mime
View raw message