hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r615423 [3/5] - in /hadoop/core/trunk: ./ src/c++/pipes/ src/c++/pipes/impl/ src/c++/utils/m4/ src/examples/pipes/ src/examples/pipes/impl/
Date Sat, 26 Jan 2008 01:48:44 GMT
Modified: hadoop/core/trunk/src/c++/pipes/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/configure.ac?rev=615423&r1=615422&r2=615423&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/configure.ac (original)
+++ hadoop/core/trunk/src/c++/pipes/configure.ac Fri Jan 25 17:48:42 2008
@@ -29,6 +29,7 @@
 AC_PREFIX_DEFAULT(`pwd`/../install)
 
 USE_HADOOP_UTILS
+HADOOP_PIPES_SETUP
 CHECK_INSTALL_CFLAG
 
 # Checks for programs.

Modified: hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=615423&r1=615422&r2=615423&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc Fri Jan 25 17:48:42 2008
@@ -30,6 +30,7 @@
 #include <stdlib.h>
 #include <strings.h>
 #include <sys/socket.h>
+#include <pthread.h>
 
 using std::map;
 using std::string;
@@ -584,6 +585,7 @@
     Partitioner* partitioner;
     int numReduces;
     const Factory* factory;
+    pthread_mutex_t mutexDone;
 
   public:
 
@@ -607,6 +609,7 @@
       lastProgress = 0;
       progressFloat = 0.0f;
       hasTask = false;
+      pthread_mutex_init(&mutexDone, NULL);
     }
 
     void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) {
@@ -689,11 +692,16 @@
     }
     
     virtual bool isDone() {
-      return done;
+      pthread_mutex_lock(&mutexDone);
+      bool doneCopy = done;
+      pthread_mutex_unlock(&mutexDone);
+      return doneCopy;
     }
 
     virtual void close() {
+      pthread_mutex_lock(&mutexDone);
       done = true;
+      pthread_mutex_unlock(&mutexDone);
     }
 
     virtual void abort() {
@@ -717,7 +725,9 @@
         key = *newKey;
       } else {
         if (!reader->next(key, const_cast<string&>(*value))) {
+          pthread_mutex_lock(&mutexDone);
           done = true;
+          pthread_mutex_unlock(&mutexDone);
           return false;
         }
         progressFloat = reader->getProgress();
@@ -856,10 +866,59 @@
       delete reducer;
       delete writer;
       delete partitioner;
+      pthread_mutex_destroy(&mutexDone);
     }
   };
 
   /**
+   * Ping the parent every 5 seconds to know if it is alive 
+   */
+  void* ping(void* ptr) {
+    TaskContextImpl* context = (TaskContextImpl*) ptr;
+    char* portStr = getenv("hadoop.pipes.command.port");
+    int MAX_RETRIES = 3;
+    int remaining_retries = MAX_RETRIES;
+    while (!context->isDone()) {
+      try{
+        sleep(5);
+        int sock = -1;
+        if (portStr) {
+          sock = socket(PF_INET, SOCK_STREAM, 0);
+          HADOOP_ASSERT(sock != - 1,
+                        string("problem creating socket: ") + strerror(errno));
+          sockaddr_in addr;
+          addr.sin_family = AF_INET;
+          addr.sin_port = htons(toInt(portStr));
+          addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+          HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
+                        string("problem connecting command socket: ") +
+                        strerror(errno));
+
+        }
+        if (sock != -1) {
+          int result = shutdown(sock, SHUT_RDWR);
+          HADOOP_ASSERT(result == 0, "problem shutting socket");
+          result = close(sock);
+          HADOOP_ASSERT(result == 0, "problem closing socket");
+        }
+        remaining_retries = MAX_RETRIES;
+      } catch (Error& err) {
+        if (!context->isDone()) {
+          fprintf(stderr, "Hadoop Pipes Exception: in ping %s\n", 
+                err.getMessage().c_str());
+          remaining_retries -= 1;
+          if (remaining_retries == 0) {
+            exit(1);
+          }
+        } else {
+          return NULL;
+        }
+      }
+    }
+    return NULL;
+  }
+
+  /**
    * Run the assigned task in the framework.
    * The user's main function should set the various functions using the 
    * set* functions above and then call this.
@@ -914,12 +973,15 @@
         connection = new TextProtocol(stdin, context, stdout);
       }
       context->setProtocol(connection, connection->getUplink());
+      pthread_t pingThread;
+      pthread_create(&pingThread, NULL, ping, (void*)(context));
       context->waitForTask();
       while (!context->isDone()) {
         context->nextKey();
       }
       context->closeAll();
       connection->getUplink()->done();
+      pthread_join(pingThread,NULL);
       delete context;
       delete connection;
       if (stream != NULL) {

Modified: hadoop/core/trunk/src/c++/pipes/impl/config.h.in
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/impl/config.h.in?rev=615423&r1=615422&r2=615423&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/impl/config.h.in (original)
+++ hadoop/core/trunk/src/c++/pipes/impl/config.h.in Fri Jan 25 17:48:42 2008
@@ -10,11 +10,17 @@
 /* Define to 1 if you have the <inttypes.h> header file. */
 #undef HAVE_INTTYPES_H
 
+/* Define to 1 if you have the `pthread' library (-lpthread). */
+#undef HAVE_LIBPTHREAD
+
 /* Define to 1 if you have the <memory.h> header file. */
 #undef HAVE_MEMORY_H
 
 /* Define to 1 if you have the `mkdir' function. */
 #undef HAVE_MKDIR
+
+/* Define to 1 if you have the <pthread.h> header file. */
+#undef HAVE_PTHREAD_H
 
 /* Define to 1 if stdbool.h conforms to C99. */
 #undef HAVE_STDBOOL_H

Modified: hadoop/core/trunk/src/c++/utils/m4/hadoop_utils.m4
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/utils/m4/hadoop_utils.m4?rev=615423&r1=615422&r2=615423&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/utils/m4/hadoop_utils.m4 (original)
+++ hadoop/core/trunk/src/c++/utils/m4/hadoop_utils.m4 Fri Jan 25 17:48:42 2008
@@ -46,9 +46,17 @@
 AC_SUBST(HADOOP_UTILS_PREFIX)
 ])
 
+AC_DEFUN([HADOOP_PIPES_SETUP],[
+AC_CHECK_HEADERS([pthread.h], [], 
+  AC_MSG_ERROR(Please check if you have installed the pthread library)) 
+AC_CHECK_LIB([pthread], [pthread_create], [], 
+  AC_MSG_ERROR(Cannot find libpthread.so, please check))
+])
+
 # define a macro for using hadoop pipes
 AC_DEFUN([USE_HADOOP_PIPES],[
 AC_REQUIRE([USE_HADOOP_UTILS])
+AC_REQUIRE([HADOOP_PIPES_SETUP])
 AC_ARG_WITH([hadoop-pipes],
             AS_HELP_STRING([--with-hadoop-pipes=<dir>],
                            [directory to get hadoop pipes from]),



Mime
View raw message