Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7A08F290 for ; Tue, 9 Apr 2013 01:28:57 +0000 (UTC) Received: (qmail 85535 invoked by uid 500); 9 Apr 2013 01:28:57 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 85510 invoked by uid 500); 9 Apr 2013 01:28:57 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 85499 invoked by uid 99); 9 Apr 2013 01:28:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Apr 2013 01:28:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Apr 2013 01:28:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id AB1422388ABC; Tue, 9 Apr 2013 01:28:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1465852 [6/15] - in /hama/trunk: ./ bin/ c++/ c++/pipes/ c++/pipes/api/ c++/pipes/api/hama/ c++/pipes/debug/ c++/pipes/impl/ c++/utils/ c++/utils/api/ c++/utils/api/hadoop/ c++/utils/impl/ c++/utils/m4/ core/src/main/java/org/apache/hama/b... Date: Tue, 09 Apr 2013 01:28:06 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130409012810.AB1422388ABC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hama/trunk/c++/pipes/configure.ac URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/configure.ac?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/configure.ac (added) +++ hama/trunk/c++/pipes/configure.ac Tue Apr 9 01:28:04 2013 @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# -*- Autoconf -*- +# Process this file with autoconf to produce a configure script. + +AC_PREREQ(2.59) +AC_INIT(hama-pipes, 0.1.0, martin.illecker@gmail.com) + +AM_INIT_AUTOMAKE([subdir-objects foreign no-dist]) + +AC_CONFIG_SRCDIR([impl/HamaPipes.cc]) +AC_CONFIG_HEADER([impl/config.h]) +AC_CONFIG_FILES([Makefile]) + +AC_PREFIX_DEFAULT(`pwd`/../install) + +USE_HADOOP_UTILS +HADOOP_PIPES_SETUP +CHECK_INSTALL_CFLAG + +# Checks for programs. +AC_PROG_CXX +AC_PROG_LIBTOOL + +# Checks for libraries. + +# Checks for header files. +AC_LANG(C++) +AC_CHECK_HEADERS([unistd.h]) + +# Checks for typedefs, structures, and compiler characteristics. +AC_HEADER_STDBOOL +AC_C_CONST +AC_TYPE_OFF_T +AC_TYPE_SIZE_T +AC_FUNC_STRERROR_R + +# Checks for library functions. +AC_CHECK_FUNCS([mkdir uname]) +AC_OUTPUT Added: hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/debug/pipes-default-gdb-commands.txt?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt (added) +++ hama/trunk/c++/pipes/debug/pipes-default-gdb-commands.txt Tue Apr 9 01:28:04 2013 @@ -0,0 +1,3 @@ +info threads +backtrace +quit Added: hama/trunk/c++/pipes/debug/pipes-default-script URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/debug/pipes-default-script?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/debug/pipes-default-script (added) +++ hama/trunk/c++/pipes/debug/pipes-default-script Tue Apr 9 01:28:04 2013 @@ -0,0 +1,3 @@ +core=`find . -name 'core*'` +#Only pipes programs have 5th argument as program name. +gdb -quiet $5 -c $core -x $HADOOP_HOME/src/c++/pipes/debug/pipes-default-gdb-commands.txt Added: hama/trunk/c++/pipes/depcomp URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/depcomp?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/depcomp (added) +++ hama/trunk/c++/pipes/depcomp Tue Apr 9 01:28:04 2013 @@ -0,0 +1,530 @@ +#! /bin/sh +# depcomp - compile a program generating dependencies as side-effects + +scriptversion=2005-07-09.11 + +# Copyright (C) 1999, 2000, 2003, 2004, 2005 Free Software Foundation, Inc. + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + +# As a special exception to the GNU General Public License, if you +# distribute this file as part of a program that contains a +# configuration script generated by Autoconf, you may include it under +# the same distribution terms that you use for the rest of that program. + +# Originally written by Alexandre Oliva . + +case $1 in + '') + echo "$0: No command. Try \`$0 --help' for more information." 1>&2 + exit 1; + ;; + -h | --h*) + cat <<\EOF +Usage: depcomp [--help] [--version] PROGRAM [ARGS] + +Run PROGRAMS ARGS to compile a file, generating dependencies +as side-effects. + +Environment variables: + depmode Dependency tracking mode. + source Source file read by `PROGRAMS ARGS'. + object Object file output by `PROGRAMS ARGS'. + DEPDIR directory where to store dependencies. + depfile Dependency file to output. + tmpdepfile Temporary file to use when outputing dependencies. + libtool Whether libtool is used (yes/no). + +Report bugs to . +EOF + exit $? + ;; + -v | --v*) + echo "depcomp $scriptversion" + exit $? + ;; +esac + +if test -z "$depmode" || test -z "$source" || test -z "$object"; then + echo "depcomp: Variables source, object and depmode must be set" 1>&2 + exit 1 +fi + +# Dependencies for sub/bar.o or sub/bar.obj go into sub/.deps/bar.Po. +depfile=${depfile-`echo "$object" | + sed 's|[^\\/]*$|'${DEPDIR-.deps}'/&|;s|\.\([^.]*\)$|.P\1|;s|Pobj$|Po|'`} +tmpdepfile=${tmpdepfile-`echo "$depfile" | sed 's/\.\([^.]*\)$/.T\1/'`} + +rm -f "$tmpdepfile" + +# Some modes work just like other modes, but use different flags. We +# parameterize here, but still list the modes in the big case below, +# to make depend.m4 easier to write. Note that we *cannot* use a case +# here, because this file can only contain one case statement. +if test "$depmode" = hp; then + # HP compiler uses -M and no extra arg. + gccflag=-M + depmode=gcc +fi + +if test "$depmode" = dashXmstdout; then + # This is just like dashmstdout with a different argument. + dashmflag=-xM + depmode=dashmstdout +fi + +case "$depmode" in +gcc3) +## gcc 3 implements dependency tracking that does exactly what +## we want. Yay! Note: for some reason libtool 1.4 doesn't like +## it if -MD -MP comes after the -MF stuff. Hmm. + "$@" -MT "$object" -MD -MP -MF "$tmpdepfile" + stat=$? + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile" + exit $stat + fi + mv "$tmpdepfile" "$depfile" + ;; + +gcc) +## There are various ways to get dependency output from gcc. Here's +## why we pick this rather obscure method: +## - Don't want to use -MD because we'd like the dependencies to end +## up in a subdir. Having to rename by hand is ugly. +## (We might end up doing this anyway to support other compilers.) +## - The DEPENDENCIES_OUTPUT environment variable makes gcc act like +## -MM, not -M (despite what the docs say). +## - Using -M directly means running the compiler twice (even worse +## than renaming). + if test -z "$gccflag"; then + gccflag=-MD, + fi + "$@" -Wp,"$gccflag$tmpdepfile" + stat=$? + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile" + exit $stat + fi + rm -f "$depfile" + echo "$object : \\" > "$depfile" + alpha=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz +## The second -e expression handles DOS-style file names with drive letters. + sed -e 's/^[^:]*: / /' \ + -e 's/^['$alpha']:\/[^:]*: / /' < "$tmpdepfile" >> "$depfile" +## This next piece of magic avoids the `deleted header file' problem. +## The problem is that when a header file which appears in a .P file +## is deleted, the dependency causes make to die (because there is +## typically no way to rebuild the header). We avoid this by adding +## dummy dependencies for each header file. Too bad gcc doesn't do +## this for us directly. + tr ' ' ' +' < "$tmpdepfile" | +## Some versions of gcc put a space before the `:'. On the theory +## that the space means something, we add a space to the output as +## well. +## Some versions of the HPUX 10.20 sed can't process this invocation +## correctly. Breaking it into two sed invocations is a workaround. + sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile" + rm -f "$tmpdepfile" + ;; + +hp) + # This case exists only to let depend.m4 do its work. It works by + # looking at the text of this script. This case will never be run, + # since it is checked for above. + exit 1 + ;; + +sgi) + if test "$libtool" = yes; then + "$@" "-Wp,-MDupdate,$tmpdepfile" + else + "$@" -MDupdate "$tmpdepfile" + fi + stat=$? + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile" + exit $stat + fi + rm -f "$depfile" + + if test -f "$tmpdepfile"; then # yes, the sourcefile depend on other files + echo "$object : \\" > "$depfile" + + # Clip off the initial element (the dependent). Don't try to be + # clever and replace this with sed code, as IRIX sed won't handle + # lines with more than a fixed number of characters (4096 in + # IRIX 6.2 sed, 8192 in IRIX 6.5). We also remove comment lines; + # the IRIX cc adds comments like `#:fec' to the end of the + # dependency line. + tr ' ' ' +' < "$tmpdepfile" \ + | sed -e 's/^.*\.o://' -e 's/#.*$//' -e '/^$/ d' | \ + tr ' +' ' ' >> $depfile + echo >> $depfile + + # The second pass generates a dummy entry for each header file. + tr ' ' ' +' < "$tmpdepfile" \ + | sed -e 's/^.*\.o://' -e 's/#.*$//' -e '/^$/ d' -e 's/$/:/' \ + >> $depfile + else + # The sourcefile does not contain any dependencies, so just + # store a dummy comment line, to avoid errors with the Makefile + # "include basename.Plo" scheme. + echo "#dummy" > "$depfile" + fi + rm -f "$tmpdepfile" + ;; + +aix) + # The C for AIX Compiler uses -M and outputs the dependencies + # in a .u file. In older versions, this file always lives in the + # current directory. Also, the AIX compiler puts `$object:' at the + # start of each line; $object doesn't have directory information. + # Version 6 uses the directory in both cases. + stripped=`echo "$object" | sed 's/\(.*\)\..*$/\1/'` + tmpdepfile="$stripped.u" + if test "$libtool" = yes; then + "$@" -Wc,-M + else + "$@" -M + fi + stat=$? + + if test -f "$tmpdepfile"; then : + else + stripped=`echo "$stripped" | sed 's,^.*/,,'` + tmpdepfile="$stripped.u" + fi + + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile" + exit $stat + fi + + if test -f "$tmpdepfile"; then + outname="$stripped.o" + # Each line is of the form `foo.o: dependent.h'. + # Do two passes, one to just change these to + # `$object: dependent.h' and one to simply `dependent.h:'. + sed -e "s,^$outname:,$object :," < "$tmpdepfile" > "$depfile" + sed -e "s,^$outname: \(.*\)$,\1:," < "$tmpdepfile" >> "$depfile" + else + # The sourcefile does not contain any dependencies, so just + # store a dummy comment line, to avoid errors with the Makefile + # "include basename.Plo" scheme. + echo "#dummy" > "$depfile" + fi + rm -f "$tmpdepfile" + ;; + +icc) + # Intel's C compiler understands `-MD -MF file'. However on + # icc -MD -MF foo.d -c -o sub/foo.o sub/foo.c + # ICC 7.0 will fill foo.d with something like + # foo.o: sub/foo.c + # foo.o: sub/foo.h + # which is wrong. We want: + # sub/foo.o: sub/foo.c + # sub/foo.o: sub/foo.h + # sub/foo.c: + # sub/foo.h: + # ICC 7.1 will output + # foo.o: sub/foo.c sub/foo.h + # and will wrap long lines using \ : + # foo.o: sub/foo.c ... \ + # sub/foo.h ... \ + # ... + + "$@" -MD -MF "$tmpdepfile" + stat=$? + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile" + exit $stat + fi + rm -f "$depfile" + # Each line is of the form `foo.o: dependent.h', + # or `foo.o: dep1.h dep2.h \', or ` dep3.h dep4.h \'. + # Do two passes, one to just change these to + # `$object: dependent.h' and one to simply `dependent.h:'. + sed "s,^[^:]*:,$object :," < "$tmpdepfile" > "$depfile" + # Some versions of the HPUX 10.20 sed can't process this invocation + # correctly. Breaking it into two sed invocations is a workaround. + sed 's,^[^:]*: \(.*\)$,\1,;s/^\\$//;/^$/d;/:$/d' < "$tmpdepfile" | + sed -e 's/$/ :/' >> "$depfile" + rm -f "$tmpdepfile" + ;; + +tru64) + # The Tru64 compiler uses -MD to generate dependencies as a side + # effect. `cc -MD -o foo.o ...' puts the dependencies into `foo.o.d'. + # At least on Alpha/Redhat 6.1, Compaq CCC V6.2-504 seems to put + # dependencies in `foo.d' instead, so we check for that too. + # Subdirectories are respected. + dir=`echo "$object" | sed -e 's|/[^/]*$|/|'` + test "x$dir" = "x$object" && dir= + base=`echo "$object" | sed -e 's|^.*/||' -e 's/\.o$//' -e 's/\.lo$//'` + + if test "$libtool" = yes; then + # With Tru64 cc, shared objects can also be used to make a + # static library. This mecanism is used in libtool 1.4 series to + # handle both shared and static libraries in a single compilation. + # With libtool 1.4, dependencies were output in $dir.libs/$base.lo.d. + # + # With libtool 1.5 this exception was removed, and libtool now + # generates 2 separate objects for the 2 libraries. These two + # compilations output dependencies in in $dir.libs/$base.o.d and + # in $dir$base.o.d. We have to check for both files, because + # one of the two compilations can be disabled. We should prefer + # $dir$base.o.d over $dir.libs/$base.o.d because the latter is + # automatically cleaned when .libs/ is deleted, while ignoring + # the former would cause a distcleancheck panic. + tmpdepfile1=$dir.libs/$base.lo.d # libtool 1.4 + tmpdepfile2=$dir$base.o.d # libtool 1.5 + tmpdepfile3=$dir.libs/$base.o.d # libtool 1.5 + tmpdepfile4=$dir.libs/$base.d # Compaq CCC V6.2-504 + "$@" -Wc,-MD + else + tmpdepfile1=$dir$base.o.d + tmpdepfile2=$dir$base.d + tmpdepfile3=$dir$base.d + tmpdepfile4=$dir$base.d + "$@" -MD + fi + + stat=$? + if test $stat -eq 0; then : + else + rm -f "$tmpdepfile1" "$tmpdepfile2" "$tmpdepfile3" "$tmpdepfile4" + exit $stat + fi + + for tmpdepfile in "$tmpdepfile1" "$tmpdepfile2" "$tmpdepfile3" "$tmpdepfile4" + do + test -f "$tmpdepfile" && break + done + if test -f "$tmpdepfile"; then + sed -e "s,^.*\.[a-z]*:,$object:," < "$tmpdepfile" > "$depfile" + # That's a tab and a space in the []. + sed -e 's,^.*\.[a-z]*:[ ]*,,' -e 's,$,:,' < "$tmpdepfile" >> "$depfile" + else + echo "#dummy" > "$depfile" + fi + rm -f "$tmpdepfile" + ;; + +#nosideeffect) + # This comment above is used by automake to tell side-effect + # dependency tracking mechanisms from slower ones. + +dashmstdout) + # Important note: in order to support this mode, a compiler *must* + # always write the preprocessed file to stdout, regardless of -o. + "$@" || exit $? + + # Remove the call to Libtool. + if test "$libtool" = yes; then + while test $1 != '--mode=compile'; do + shift + done + shift + fi + + # Remove `-o $object'. + IFS=" " + for arg + do + case $arg in + -o) + shift + ;; + $object) + shift + ;; + *) + set fnord "$@" "$arg" + shift # fnord + shift # $arg + ;; + esac + done + + test -z "$dashmflag" && dashmflag=-M + # Require at least two characters before searching for `:' + # in the target name. This is to cope with DOS-style filenames: + # a dependency such as `c:/foo/bar' could be seen as target `c' otherwise. + "$@" $dashmflag | + sed 's:^[ ]*[^: ][^:][^:]*\:[ ]*:'"$object"'\: :' > "$tmpdepfile" + rm -f "$depfile" + cat < "$tmpdepfile" > "$depfile" + tr ' ' ' +' < "$tmpdepfile" | \ +## Some versions of the HPUX 10.20 sed can't process this invocation +## correctly. Breaking it into two sed invocations is a workaround. + sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile" + rm -f "$tmpdepfile" + ;; + +dashXmstdout) + # This case only exists to satisfy depend.m4. It is never actually + # run, as this mode is specially recognized in the preamble. + exit 1 + ;; + +makedepend) + "$@" || exit $? + # Remove any Libtool call + if test "$libtool" = yes; then + while test $1 != '--mode=compile'; do + shift + done + shift + fi + # X makedepend + shift + cleared=no + for arg in "$@"; do + case $cleared in + no) + set ""; shift + cleared=yes ;; + esac + case "$arg" in + -D*|-I*) + set fnord "$@" "$arg"; shift ;; + # Strip any option that makedepend may not understand. Remove + # the object too, otherwise makedepend will parse it as a source file. + -*|$object) + ;; + *) + set fnord "$@" "$arg"; shift ;; + esac + done + obj_suffix="`echo $object | sed 's/^.*\././'`" + touch "$tmpdepfile" + ${MAKEDEPEND-makedepend} -o"$obj_suffix" -f"$tmpdepfile" "$@" + rm -f "$depfile" + cat < "$tmpdepfile" > "$depfile" + sed '1,2d' "$tmpdepfile" | tr ' ' ' +' | \ +## Some versions of the HPUX 10.20 sed can't process this invocation +## correctly. Breaking it into two sed invocations is a workaround. + sed -e 's/^\\$//' -e '/^$/d' -e '/:$/d' | sed -e 's/$/ :/' >> "$depfile" + rm -f "$tmpdepfile" "$tmpdepfile".bak + ;; + +cpp) + # Important note: in order to support this mode, a compiler *must* + # always write the preprocessed file to stdout. + "$@" || exit $? + + # Remove the call to Libtool. + if test "$libtool" = yes; then + while test $1 != '--mode=compile'; do + shift + done + shift + fi + + # Remove `-o $object'. + IFS=" " + for arg + do + case $arg in + -o) + shift + ;; + $object) + shift + ;; + *) + set fnord "$@" "$arg" + shift # fnord + shift # $arg + ;; + esac + done + + "$@" -E | + sed -n -e '/^# [0-9][0-9]* "\([^"]*\)".*/ s:: \1 \\:p' \ + -e '/^#line [0-9][0-9]* "\([^"]*\)".*/ s:: \1 \\:p' | + sed '$ s: \\$::' > "$tmpdepfile" + rm -f "$depfile" + echo "$object : \\" > "$depfile" + cat < "$tmpdepfile" >> "$depfile" + sed < "$tmpdepfile" '/^$/d;s/^ //;s/ \\$//;s/$/ :/' >> "$depfile" + rm -f "$tmpdepfile" + ;; + +msvisualcpp) + # Important note: in order to support this mode, a compiler *must* + # always write the preprocessed file to stdout, regardless of -o, + # because we must use -o when running libtool. + "$@" || exit $? + IFS=" " + for arg + do + case "$arg" in + "-Gm"|"/Gm"|"-Gi"|"/Gi"|"-ZI"|"/ZI") + set fnord "$@" + shift + shift + ;; + *) + set fnord "$@" "$arg" + shift + shift + ;; + esac + done + "$@" -E | + sed -n '/^#line [0-9][0-9]* "\([^"]*\)"/ s::echo "`cygpath -u \\"\1\\"`":p' | sort | uniq > "$tmpdepfile" + rm -f "$depfile" + echo "$object : \\" > "$depfile" + . "$tmpdepfile" | sed 's% %\\ %g' | sed -n '/^\(.*\)$/ s:: \1 \\:p' >> "$depfile" + echo " " >> "$depfile" + . "$tmpdepfile" | sed 's% %\\ %g' | sed -n '/^\(.*\)$/ s::\1\::p' >> "$depfile" + rm -f "$tmpdepfile" + ;; + +none) + exec "$@" + ;; + +*) + echo "Unknown depmode $depmode" 1>&2 + exit 1 + ;; +esac + +exit 0 + +# Local Variables: +# mode: shell-script +# sh-indentation: 2 +# eval: (add-hook 'write-file-hooks 'time-stamp) +# time-stamp-start: "scriptversion=" +# time-stamp-format: "%:y-%02m-%02d.%02H" +# time-stamp-end: "$" +# End: Added: hama/trunk/c++/pipes/impl/HamaPipes.cc URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/impl/HamaPipes.cc?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/impl/HamaPipes.cc (added) +++ hama/trunk/c++/pipes/impl/HamaPipes.cc Tue Apr 9 01:28:04 2013 @@ -0,0 +1,1299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hama/Pipes.hh" +#include "hadoop/SerialUtils.hh" +#include "hadoop/StringUtils.hh" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define stringify( name ) # name + +using std::map; +using std::string; +using std::vector; +using std::cout; +using std::endl; + +using namespace HadoopUtils; + +namespace HamaPipes { + + bool logging; + + /********************************************/ + /****************** BSPJob ******************/ + /********************************************/ + class BSPJobImpl: public BSPJob { + private: + map values; + public: + void set(const string& key, const string& value) { + values[key] = value; + } + + virtual bool hasKey(const string& key) const { + return values.find(key) != values.end(); + } + + virtual const string& get(const string& key) const { + map::const_iterator itr = values.find(key); + if (itr == values.end()) { + throw Error("Key " + key + " not found in BSPJob"); + } + return itr->second; + } + + virtual int getInt(const string& key) const { + const string& val = get(key); + return toInt(val); + } + + virtual float getFloat(const string& key) const { + const string& val = get(key); + return toFloat(val); + } + + virtual bool getBoolean(const string&key) const { + const string& val = get(key); + return toBool(val); + } + }; + + /********************************************/ + /************* DownwardProtocol *************/ + /********************************************/ + class DownwardProtocol { + public: + virtual void start(int protocol) = 0; + virtual void setBSPJob(vector values) = 0; + virtual void setInputTypes(string keyType, string valueType) = 0; + virtual void setKeyValue(const string& _key, const string& _value) = 0; + + virtual void runBsp(bool pipedInput, bool pipedOutput) = 0; + virtual void runCleanup(bool pipedInput, bool pipedOutput) = 0; + virtual void runSetup(bool pipedInput, bool pipedOutput) = 0; + virtual void runPartition(const string& key, const string& value, int32_t numTasks) = 0; + + virtual void setNewResult(int32_t value) = 0; + virtual void setNewResult(int64_t value) = 0; + virtual void setNewResult(const string& value) = 0; + virtual void setNewResult(vector value) = 0; + + //virtual void reduceKey(const string& key) = 0; + //virtual void reduceValue(const string& value) = 0; + virtual void close() = 0; + virtual void abort() = 0; + virtual ~DownwardProtocol() {} + }; + + /********************************************/ + /************** UpwardProtocol **************/ + /********************************************/ + class UpwardProtocol { + public: + virtual void sendCMD(int32_t cmd) = 0; + virtual void sendCMD(int32_t cmd, int32_t value) = 0; + virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) = 0; + virtual void sendCMD(int32_t cmd, const string& value) = 0; + virtual void sendCMD(int32_t cmd, const string values[], int size) = 0; + + //virtual void registerCounter(int id, const string& group, const string& name) = 0; + //virtual void incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0; + virtual void incrementCounter(const string& group, const string& name, uint64_t amount) = 0; + virtual ~UpwardProtocol() {} + }; + + /********************************************/ + /***************** Protocol *****************/ + /********************************************/ + class Protocol { + public: + virtual void nextEvent() = 0; + virtual UpwardProtocol* getUplink() = 0; + virtual ~Protocol(){} + }; + + /********************************************/ + /*************** MESSAGE_TYPE ***************/ + /********************************************/ + enum MESSAGE_TYPE { + START_MESSAGE, SET_BSPJOB_CONF, SET_INPUT_TYPES, + RUN_SETUP, RUN_BSP, RUN_CLEANUP, + READ_KEYVALUE, WRITE_KEYVALUE, + GET_MSG, GET_MSG_COUNT, + SEND_MSG, SYNC, + GET_ALL_PEERNAME, GET_PEERNAME, + GET_PEER_INDEX, GET_PEER_COUNT, GET_SUPERSTEP_COUNT, + REOPEN_INPUT, CLEAR, + CLOSE, ABORT, + DONE, TASK_DONE, + REGISTER_COUNTER, INCREMENT_COUNTER, + SEQFILE_OPEN, SEQFILE_READNEXT, + SEQFILE_APPEND, SEQFILE_CLOSE, + PARTITION_REQUEST, PARTITION_RESPONSE + }; + + /* Only needed for debugging output */ + const char* messageTypeNames[] = { + stringify( START_MESSAGE ), stringify( SET_BSPJOB_CONF ), stringify( SET_INPUT_TYPES ), + stringify( RUN_SETUP ), stringify( RUN_BSP ), stringify( RUN_CLEANUP ), + stringify( READ_KEYVALUE ), stringify( WRITE_KEYVALUE ), + stringify( GET_MSG ), stringify( GET_MSG_COUNT ), + stringify( SEND_MSG ), stringify( SYNC ), + stringify( GET_ALL_PEERNAME ), stringify( GET_PEERNAME ), + stringify( GET_PEER_INDEX ), stringify( GET_PEER_COUNT ), stringify( GET_SUPERSTEP_COUNT ), + stringify( REOPEN_INPUT ), stringify( CLEAR ), + stringify( CLOSE ), stringify( ABORT ), + stringify( DONE ), stringify( TASK_DONE ), + stringify( REGISTER_COUNTER ), stringify( INCREMENT_COUNTER ), + stringify( SEQFILE_OPEN ), stringify( SEQFILE_READNEXT ), + stringify( SEQFILE_APPEND ), stringify( SEQFILE_CLOSE ), + stringify( PARTITION_REQUEST ), stringify( PARTITION_RESPONSE ) + }; + + /********************************************/ + /*********** BinaryUpwardProtocol ***********/ + /********************************************/ + class BinaryUpwardProtocol: public UpwardProtocol { + private: + FileOutStream* stream; + public: + BinaryUpwardProtocol(FILE* _stream) { + stream = new FileOutStream(); + HADOOP_ASSERT(stream->open(_stream), "problem opening stream"); + + } + + virtual void sendCMD(int32_t cmd) { + serializeInt(cmd, *stream); + stream->flush(); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s\n", + messageTypeNames[cmd]); + } + + virtual void sendCMD(int32_t cmd, int32_t value) { + serializeInt(cmd, *stream); + serializeInt(value, *stream); + stream->flush(); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %d\n",messageTypeNames[cmd],value); + } + + virtual void sendCMD(int32_t cmd, const string& value) { + serializeInt(cmd, *stream); + serializeString(value, *stream); + stream->flush(); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent CMD: %s with Value: %s\n",messageTypeNames[cmd],value.c_str()); + } + + virtual void sendCMD(int32_t cmd, const string values[], int size) { + serializeInt(cmd, *stream); + for (int i=0; iflush(); + } + + virtual void sendCMD(int32_t cmd, int32_t value, const string values[], int size) { + serializeInt(cmd, *stream); + serializeInt(value, *stream); + for (int i=0; iflush(); + } + + /* + virtual void registerCounter(int id, const string& group, + const string& name) { + serializeInt(REGISTER_COUNTER, *stream); + serializeInt(id, *stream); + serializeString(group, *stream); + serializeString(name, *stream); + } + + virtual void incrementCounter(const TaskContext::Counter* counter, + uint64_t amount) { + serializeInt(INCREMENT_COUNTER, *stream); + serializeInt(counter->getId(), *stream); + serializeLong(amount, *stream); + } + */ + + virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { + serializeInt(INCREMENT_COUNTER, *stream); + serializeString(group, *stream); + serializeString(name, *stream); + serializeLong(amount, *stream); + stream->flush(); + if(logging)fprintf(stderr,"HamaPipes::BinaryUpwardProtocol sent incrementCounter\n"); + } + + ~BinaryUpwardProtocol() { + delete stream; + } + }; + + /********************************************/ + /************** BinaryProtocol **************/ + /********************************************/ + class BinaryProtocol: public Protocol { + private: + FileInStream* downStream; + DownwardProtocol* handler; + BinaryUpwardProtocol * uplink; + + string key; + string value; + + public: + BinaryProtocol(FILE* down, DownwardProtocol* _handler, FILE* up) { + downStream = new FileInStream(); + downStream->open(down); + uplink = new BinaryUpwardProtocol(up); + handler = _handler; + + //authDone = false; + //getPassword(password); + } + + UpwardProtocol* getUplink() { + return uplink; + } + + + virtual void nextEvent() { + int32_t cmd; + cmd = deserializeInt(*downStream); + + switch (cmd) { + + case START_MESSAGE: { + int32_t prot; + prot = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got START_MESSAGE prot: %d\n", prot); + handler->start(prot); + break; + } + /* SET BSP Job Configuration / Environment */ + case SET_BSPJOB_CONF: { + int32_t entries; + entries = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF entries: %d\n", entries); + vector result(entries*2); + for(int i=0; i < entries*2; ++i) { + string item; + deserializeString(item, *downStream); + result.push_back(item); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_BSPJOB_CONF add NewEntry: %s\n", item.c_str()); + } + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got all Configuration %d entries!\n", entries); + handler->setBSPJob(result); + break; + } + case SET_INPUT_TYPES: { + string keyType; + string valueType; + deserializeString(keyType, *downStream); + deserializeString(valueType, *downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SET_INPUT_TYPES keyType: %s valueType: %s\n", + keyType.c_str(),valueType.c_str()); + handler->setInputTypes(keyType, valueType); + break; + } + case READ_KEYVALUE: { + deserializeString(key, *downStream); + deserializeString(value, *downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got READ_KEYVALUE key: %s value: %s\n", + key.c_str(), + ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); + handler->setKeyValue(key, value); + break; + } + case RUN_SETUP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_SETUP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runSetup(pipedInput, pipedOutput); + break; + } + case RUN_BSP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_BSP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runBsp(pipedInput, pipedOutput); + break; + } + case RUN_CLEANUP: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got RUN_CLEANUP\n"); + int32_t pipedInput; + int32_t pipedOutput; + pipedInput = deserializeInt(*downStream); + pipedOutput = deserializeInt(*downStream); + handler->runCleanup(pipedInput, pipedOutput); + break; + } + + case PARTITION_REQUEST: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got PARTITION_REQUEST\n"); + string partionKey; + string partionValue; + int32_t numTasks; + deserializeString(partionKey, *downStream); + deserializeString(partionValue, *downStream); + numTasks = deserializeInt(*downStream); + handler->runPartition(partionKey, partionValue, numTasks); + break; + } + + + case GET_MSG_COUNT: { + int32_t msgCount = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG_COUNT msgCount: %d\n",msgCount); + handler->setNewResult(msgCount); + break; + } + case GET_MSG: { + string msg; + deserializeString(msg,*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_MSG msg: %s\n",msg.c_str()); + handler->setNewResult(msg); + break; + } + case GET_PEERNAME: { + string peername; + deserializeString(peername,*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEERNAME peername: %s\n",peername.c_str()); + handler->setNewResult(peername); + break; + } + case GET_ALL_PEERNAME: { + vector peernames; + int32_t peernameCount = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_ALL_PEERNAME peernameCount: %d\n",peernameCount); + string peername; + for (int i=0; isetNewResult(peernames); + break; + } + case GET_PEER_INDEX: { + int32_t peerIndex = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_INDEX peerIndex: %d\n",peerIndex); + handler->setNewResult(peerIndex); + break; + } + case GET_PEER_COUNT: { + int32_t peerCount = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_PEER_COUNT peerCount: %d\n",peerCount); + handler->setNewResult(peerCount); + break; + } + case GET_SUPERSTEP_COUNT: { + int64_t superstepCount = deserializeLong(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got GET_SUPERSTEP_COUNT superstepCount: %ld\n",(long)superstepCount); + handler->setNewResult(superstepCount); + break; + } + + + case SEQFILE_OPEN: { + int32_t fileID = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_OPEN fileID: %d\n",fileID); + handler->setNewResult(fileID); + break; + } + case SEQFILE_READNEXT: { + deserializeString(key, *downStream); + deserializeString(value, *downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_READNEXT key: %s value: %s\n", + key.c_str(), + ((value.length()<10)?value.c_str():value.substr(0,9).c_str()) ); + handler->setKeyValue(key, value); + break; + } + case SEQFILE_APPEND: { + int32_t result = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_APPEND result: %d\n",result); + handler->setNewResult(result); + break; + } + case SEQFILE_CLOSE: { + int32_t result = deserializeInt(*downStream); + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got SEQFILE_CLOSE result: %d\n",result); + handler->setNewResult(result); + break; + } + + + case CLOSE: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got CLOSE\n"); + handler->close(); + break; + } + case ABORT: { + if(logging)fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - got ABORT\n"); + handler->abort(); + break; + } + default: + HADOOP_ASSERT(false, "Unknown binary command " + toString(cmd)); + fprintf(stderr,"HamaPipes::BinaryProtocol::nextEvent - Unknown binary command: %d\n",cmd); + } + } + + virtual ~BinaryProtocol() { + delete downStream; + delete uplink; + } + }; + + /********************************************/ + /************** BSPContextImpl **************/ + /********************************************/ + class BSPContextImpl: public BSPContext, public DownwardProtocol { + private: + bool done; + BSPJob* job; + //string key; + //const string* newKey; + //const string* value; + bool hasTask; + //bool isNewKey; + //bool isNewValue; + string* inputKeyClass; + string* inputValueClass; + + //string status; + //float progressFloat; + //uint64_t lastProgress; + //bool statusSet; + + Protocol* protocol; + UpwardProtocol *uplink; + + //string* inputSplit; + + RecordReader* reader; + RecordWriter* writer; + + BSP* bsp; + Partitioner* partitioner; + + const Factory* factory; + pthread_mutex_t mutexDone; + std::vector registeredCounterIds; + + int32_t resultInt; + bool isNewResultInt; + int64_t resultLong; + bool isNewResultLong; + string resultString; + bool isNewResultString; + vector resultVector; + bool isNewResultVector; + + bool isNewKeyValuePair; + string currentKey; + string currentValue; + + public: + + BSPContextImpl(const Factory& _factory) { + //statusSet = false; + done = false; + //newKey = NULL; + factory = &_factory; + job = NULL; + + inputKeyClass = NULL; + inputValueClass = NULL; + + //inputSplit = NULL; + + bsp = NULL; + reader = NULL; + writer = NULL; + partitioner = NULL; + protocol = NULL; + //isNewKey = false; + //isNewValue = false; + //lastProgress = 0; + //progressFloat = 0.0f; + hasTask = false; + pthread_mutex_init(&mutexDone, NULL); + + isNewResultInt = false; + isNewResultString = false, + isNewResultVector = false; + + isNewKeyValuePair = false; + } + + + /********************************************/ + /*********** DownwardProtocol IMPL **********/ + /********************************************/ + virtual void start(int protocol) { + if (protocol != 0) { + throw Error("Protocol version " + toString(protocol) + + " not supported"); + } + partitioner = factory->createPartitioner(*this); + } + + virtual void setBSPJob(vector values) { + int len = values.size(); + BSPJobImpl* result = new BSPJobImpl(); + HADOOP_ASSERT(len % 2 == 0, "Odd length of job conf values"); + for(int i=0; i < len; i += 2) { + result->set(values[i], values[i+1]); + } + job = result; + } + + virtual void setInputTypes(string keyType, string valueType) { + inputKeyClass = new string(keyType); + inputValueClass = new string(valueType); + } + + virtual void setKeyValue(const string& _key, const string& _value) { + currentKey = _key; + currentValue = _value; + isNewKeyValuePair = true; + } + + /* private Method */ + void setupReaderWriter(bool pipedInput, bool pipedOutput) { + + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::setupReaderWriter - pipedInput: %s pipedOutput: %s\n", + (pipedInput)?"true":"false",(pipedOutput)?"true":"false"); + + if (pipedInput && reader==NULL) { + reader = factory->createRecordReader(*this); + HADOOP_ASSERT((reader == NULL) == pipedInput, + pipedInput ? "RecordReader defined when not needed.": + "RecordReader not defined"); + + //if (reader != NULL) { + // value = new string(); + //} + } + + if (pipedOutput && writer==NULL) { + writer = factory->createRecordWriter(*this); + HADOOP_ASSERT((writer == NULL) == pipedOutput, + pipedOutput ? "RecordWriter defined when not needed.": + "RecordWriter not defined"); + } + } + + virtual void runSetup(bool pipedInput, bool pipedOutput) { + setupReaderWriter(pipedInput,pipedOutput); + + if (bsp == NULL) + bsp = factory->createBSP(*this); + + if (bsp != NULL) { + hasTask = true; + bsp->setup(*this); + hasTask = false; + uplink->sendCMD(TASK_DONE); + } + } + + virtual void runBsp(bool pipedInput, bool pipedOutput) { + setupReaderWriter(pipedInput,pipedOutput); + + if (bsp == NULL) + bsp = factory->createBSP(*this); + + if (bsp != NULL) { + hasTask = true; + bsp->bsp(*this); + hasTask = false; + uplink->sendCMD(TASK_DONE); + } + } + + virtual void runCleanup(bool pipedInput, bool pipedOutput) { + setupReaderWriter(pipedInput,pipedOutput); + + if (bsp != NULL) { + hasTask = true; + bsp->cleanup(*this); + hasTask = false; + uplink->sendCMD(TASK_DONE); + } + } + + /********************************************/ + /******* Partitioner *******/ + /********************************************/ + virtual void runPartition(const string& key, const string& value, int32_t numTasks){ + if (partitioner != NULL) { + int part = partitioner->partition(key, value, numTasks); + uplink->sendCMD(PARTITION_RESPONSE, part); + } else { + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::runPartition Partitioner is NULL!\n"); + } + } + + virtual void setNewResult(int32_t _value) { + resultInt = _value; + isNewResultInt = true; + } + + virtual void setNewResult(int64_t _value) { + resultLong = _value; + isNewResultLong = true; + } + + virtual void setNewResult(const string& _value) { + resultString = _value; + isNewResultString = true; + } + + virtual void setNewResult(vector _value) { + resultVector = _value; + isNewResultVector = true; + } + + virtual void close() { + pthread_mutex_lock(&mutexDone); + done = true; + hasTask = false; + pthread_mutex_unlock(&mutexDone); + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::close - done: %s hasTask: %s\n", + (done)?"true":"false",(hasTask)?"true":"false"); + } + + virtual void abort() { + throw Error("Aborted by driver"); + } + + /********************************************/ + /************** TaskContext IMPL ************/ + /********************************************/ + + /** + * Get the BSPJob for the current task. + */ + virtual const BSPJob* getBSPJob() { + return job; + } + + /** + * Get the current key. + * @return the current key or NULL if called before the first map or reduce + */ + //virtual const string& getInputKey() { + // return key; + //} + + /** + * Get the current value. + * @return the current value or NULL if called before the first map or + * reduce + */ + //virtual const string& getInputValue() { + // return *value; + //} + + /** + * Register a counter with the given group and name. + */ + /* + virtual Counter* getCounter(const std::string& group, + const std::string& name) { + int id = registeredCounterIds.size(); + registeredCounterIds.push_back(id); + uplink->registerCounter(id, group, name); + return new Counter(id); + }*/ + + /** + * Increment the value of the counter with the given amount. + */ + virtual void incrementCounter(const string& group, const string& name, uint64_t amount) { + uplink->incrementCounter(group, name, amount); + } + + /********************************************/ + /************** BSPContext IMPL *************/ + /********************************************/ + + /** + * Access the InputSplit of the bsp. + */ + //virtual const string& getInputSplit() { + // return *inputSplit; + //} + + /** + * Get the name of the key class of the input to this task. + */ + virtual const string& getInputKeyClass() { + return *inputKeyClass; + } + + /** + * Get the name of the value class of the input to this task. + */ + virtual const string& getInputValueClass() { + return *inputValueClass; + } + + /** + * Send a data with a tag to another BSPSlave corresponding to hostname. + * Messages sent by this method are not guaranteed to be received in a sent + * order. + */ + virtual void sendMessage(const string& peerName, const string& msg) { + string values[] = {peerName, msg}; + uplink->sendCMD(SEND_MSG,values, 2); + } + + /** + * @return A message from the peer's received messages queue (a FIFO). + */ + virtual const string& getCurrentMessage() { + uplink->sendCMD(GET_MSG); + + while (!isNewResultString) + protocol->nextEvent(); + + isNewResultString = false; + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getMessage - NewResultString: %s\n",resultString.c_str()); + return resultString; + } + + /** + * @return The number of messages in the peer's received messages queue. + */ + virtual int getNumCurrentMessages() { + uplink->sendCMD(GET_MSG_COUNT); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return resultInt; + } + + /** + * Barrier Synchronization. + * + * Sends all the messages in the outgoing message queues to the corresponding + * remote peers. + */ + virtual void sync() { + uplink->sendCMD(SYNC); + } + + /** + * @return the name of this peer in the format "hostname:port". + */ + virtual const string& getPeerName() { + uplink->sendCMD(GET_PEERNAME,-1); + + while (!isNewResultString) + protocol->nextEvent(); + + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str()); + isNewResultString = false; + return resultString; + } + + /** + * @return the name of n-th peer from sorted array by name. + */ + virtual const string& getPeerName(int index) { + uplink->sendCMD(GET_PEERNAME,index); + + while (!isNewResultString) + protocol->nextEvent(); + + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::getPeerName - NewResultString: %s\n",resultString.c_str()); + isNewResultString = false; + return resultString; + } + + /** + * @return the names of all the peers executing tasks from the same job + * (including this peer). + */ + virtual vector getAllPeerNames() { + uplink->sendCMD(GET_ALL_PEERNAME); + + while (!isNewResultVector) + protocol->nextEvent(); + + isNewResultVector = false; + return resultVector; + } + + /** + * @return the index of this peer from sorted array by name. + */ + virtual int getPeerIndex() { + uplink->sendCMD(GET_PEER_INDEX); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return resultInt; + } + + /** + * @return the number of peers + */ + virtual int getNumPeers() { + uplink->sendCMD(GET_PEER_COUNT); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return resultInt; + } + + /** + * @return the count of current super-step + */ + virtual long getSuperstepCount() { + uplink->sendCMD(GET_SUPERSTEP_COUNT); + + while (!isNewResultLong) + protocol->nextEvent(); + + isNewResultLong = false; + return resultLong; + } + + /** + * Clears all queues entries. + */ + virtual void clear() { + uplink->sendCMD(CLEAR); + } + + /** + * Writes a key/value pair to the output collector + */ + virtual void write(const string& key, const string& value) { + if (writer != NULL) { + writer->emit(key, value); + } else { + string values[] = {key, value}; + uplink->sendCMD(WRITE_KEYVALUE, values, 2); + } + } + + /** + * Deserializes the next input key value into the given objects; + */ + virtual bool readNext(string& _key, string& _value) { + uplink->sendCMD(READ_KEYVALUE); + + while (!isNewKeyValuePair) + protocol->nextEvent(); + + isNewKeyValuePair = false; + + _key = currentKey; + _value = currentValue; + + if (logging && _key.empty() && _value.empty()) + fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n"); + + return (!_key.empty() && !_value.empty()); + } + + /** + * Closes the input and opens it right away, so that the file pointer is at + * the beginning again. + */ + virtual void reopenInput() { + uplink->sendCMD(REOPEN_INPUT); + } + + + /********************************************/ + /******* SequenceFileConnector IMPL *******/ + /********************************************/ + + /** + * Open SequenceFile with opion "r" or "w" + * @return the corresponding fileID + */ + virtual int sequenceFileOpen(const string& path, const string& option, + const string& keyType, const string& valueType) { + if (logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen - Path: %s\n",path.c_str()); + + if ( (option.compare("r")==0) || (option.compare("w")==0)) { + + string values[] = {path, option, keyType, valueType}; + uplink->sendCMD(SEQFILE_OPEN,values, 4); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return resultInt; + } else { + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileOpen wrong option: %s!\n",option.c_str()); + return -1; //Error wrong option + } + } + + /** + * Read next key/value pair from the SequenceFile with fileID + */ + virtual bool sequenceFileReadNext(int fileID, string& _key, string& _value) { + + uplink->sendCMD(SEQFILE_READNEXT,fileID); + + while (!isNewKeyValuePair) + protocol->nextEvent(); + + isNewKeyValuePair = false; + + _key = currentKey; + _value = currentValue; + + if (logging && _key.empty() && _value.empty()) + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileReadNext - Empty KeyValuePair\n"); + + return (!_key.empty() && !_value.empty()); + } + + /** + * Append the next key/value pair to the SequenceFile with fileID + */ + virtual bool sequenceFileAppend(int fileID, const string& key, const string& value) { + string values[] = {key, value}; + uplink->sendCMD(SEQFILE_APPEND,fileID, values, 2); + + while (!isNewResultInt) + protocol->nextEvent(); + + isNewResultInt = false; + return (resultInt==1); + } + + /** + * Close SequenceFile + */ + virtual bool sequenceFileClose(int fileID) { + uplink->sendCMD(SEQFILE_CLOSE,fileID); + + while (!isNewResultInt) + protocol->nextEvent(); + + if (logging && resultInt==0) + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - Nothing was closed!\n"); + else if (logging) + fprintf(stderr,"HamaPipes::BSPContextImpl::sequenceFileClose - File was successfully closed!\n"); + + isNewResultInt = false; + return (resultInt==1); + } + + /********************************************/ + /*************** Other STUFF ***************/ + /********************************************/ + + void setProtocol(Protocol* _protocol, UpwardProtocol* _uplink) { + protocol = _protocol; + uplink = _uplink; + } + + bool isDone() { + pthread_mutex_lock(&mutexDone); + bool doneCopy = done; + pthread_mutex_unlock(&mutexDone); + return doneCopy; + } + + /** + * Advance to the next value. + */ + /* + bool nextValue() { + if (isNewKey || done) { + return false; + } + isNewValue = false; + //progress(); + protocol->nextEvent(); + return isNewValue; + } + */ + void waitForTask() { + while (!done && !hasTask) { + if(logging)fprintf(stderr,"HamaPipes::BSPContextImpl::waitForTask - done: %s hasTask: %s\n", + (done)?"true":"false",(hasTask)?"true":"false"); + protocol->nextEvent(); + } + } + /* + bool nextKey() { + if (reader == NULL) { + while (!isNewKey) { + nextValue(); + if (done) { + return false; + } + } + key = *newKey; + } else { + if (!reader->next(key, const_cast(*value))) { + pthread_mutex_lock(&mutexDone); + done = true; + pthread_mutex_unlock(&mutexDone); + return false; + } + //progressFloat = reader->getProgress(); + } + isNewKey = false; + + if (bsp != NULL) { + bsp->bsp(*this); + } + return true; + } + */ + void closeAll() { + if (reader) { + reader->close(); + } + + if (bsp) { + bsp->close(); + } + + if (writer) { + writer->close(); + } + } + + virtual ~BSPContextImpl() { + delete job; + delete inputKeyClass; + delete inputValueClass; + //delete inputSplit; + //if (reader) { + // delete value; + //} + delete reader; + delete bsp; + delete writer; + pthread_mutex_destroy(&mutexDone); + } + }; + + /** + * Ping the parent every 5 seconds to know if it is alive + */ + void* ping(void* ptr) { + BSPContextImpl* context = (BSPContextImpl*) ptr; + char* portStr = getenv("hama.pipes.command.port"); + int MAX_RETRIES = 3; + int remaining_retries = MAX_RETRIES; + while (!context->isDone()) { + try{ + sleep(5); + int sock = -1; + if (portStr) { + sock = socket(PF_INET, SOCK_STREAM, 0); + HADOOP_ASSERT(sock != - 1, + string("problem creating socket: ") + strerror(errno)); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(toInt(portStr)); + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + if(logging)fprintf(stderr,"HamaPipes::ping - connected to GroomServer Port: %s\n", portStr); + HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, + string("problem connecting command socket: ") + + strerror(errno)); + + } + if (sock != -1) { + int result = shutdown(sock, SHUT_RDWR); + HADOOP_ASSERT(result == 0, "problem shutting socket"); + result = close(sock); + HADOOP_ASSERT(result == 0, "problem closing socket"); + } + remaining_retries = MAX_RETRIES; + } catch (Error& err) { + if (!context->isDone()) { + fprintf(stderr, "Hama Pipes Exception: in ping %s\n", + err.getMessage().c_str()); + remaining_retries -= 1; + if (remaining_retries == 0) { + exit(1); + } + } else { + return NULL; + } + } + } + return NULL; + } + + /** + * Run the assigned task in the framework. + * The user's main function should set the various functions using the + * set* functions above and then call this. + * @return true, if the task succeeded. + */ + bool runTask(const Factory& factory) { + try { + HADOOP_ASSERT(getenv("hama.pipes.logging")!=NULL,"No environment found!"); + + logging = (toInt(getenv("hama.pipes.logging"))==0)?false:true; + if(logging)fprintf(stderr,"HamaPipes::runTask - logging is: %s\n", (logging)?"true":"false"); + + BSPContextImpl* context = new BSPContextImpl(factory); + Protocol* connection; + + char* portStr = getenv("hama.pipes.command.port"); + int sock = -1; + FILE* stream = NULL; + FILE* outStream = NULL; + char *bufin = NULL; + char *bufout = NULL; + if (portStr) { + sock = socket(PF_INET, SOCK_STREAM, 0); + HADOOP_ASSERT(sock != - 1, + string("problem creating socket: ") + strerror(errno)); + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(toInt(portStr)); + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0, + string("problem connecting command socket: ") + + strerror(errno)); + + stream = fdopen(sock, "r"); + outStream = fdopen(sock, "w"); + + // increase buffer size + int bufsize = 128*1024; + int setbuf; + bufin = new char[bufsize]; + bufout = new char[bufsize]; + setbuf = setvbuf(stream, bufin, _IOFBF, bufsize); + HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ") + + strerror(errno)); + setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize); + HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ") + + strerror(errno)); + + connection = new BinaryProtocol(stream, context, outStream); + if(logging)fprintf(stderr,"HamaPipes::runTask - connected to GroomServer Port: %s\n", portStr); + + } else if (getenv("hama.pipes.command.file")) { + char* filename = getenv("hama.pipes.command.file"); + string outFilename = filename; + outFilename += ".out"; + stream = fopen(filename, "r"); + outStream = fopen(outFilename.c_str(), "w"); + connection = new BinaryProtocol(stream, context, outStream); + } else { + //connection = new TextProtocol(stdin, context, stdout); + fprintf(stderr,"HamaPipes::runTask - Connection couldn't be initialized!\n"); + return -1; + } + + context->setProtocol(connection, connection->getUplink()); + + //pthread_t pingThread; + //pthread_create(&pingThread, NULL, ping, (void*)(context)); + + context->waitForTask(); + + //while (!context->isDone()) { + //context->nextKey(); + //} + + context->closeAll(); + connection->getUplink()->sendCMD(DONE); + + //pthread_join(pingThread,NULL); + + delete context; + delete connection; + if (stream != NULL) { + fflush(stream); + } + if (outStream != NULL) { + fflush(outStream); + } + fflush(stdout); + if (sock != -1) { + int result = shutdown(sock, SHUT_RDWR); + HADOOP_ASSERT(result == 0, "problem shutting socket"); + result = close(sock); + HADOOP_ASSERT(result == 0, "problem closing socket"); + } + if (stream != NULL) { + //fclose(stream); + } + if (outStream != NULL) { + //fclose(outStream); + } + delete bufin; + delete bufout; + return true; + } catch (Error& err) { + fprintf(stderr, "Hama Pipes Exception: %s\n", + err.getMessage().c_str()); + return false; + } + } +} + Added: hama/trunk/c++/pipes/impl/config.h.in URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/impl/config.h.in?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/impl/config.h.in (added) +++ hama/trunk/c++/pipes/impl/config.h.in Tue Apr 9 01:28:04 2013 @@ -0,0 +1,141 @@ +/* impl/config.h.in. Generated from configure.ac by autoheader. */ + +/* Define to 1 if you have the declaration of `strerror_r', and to 0 if you + don't. */ +#undef HAVE_DECL_STRERROR_R + +/* Define to 1 if you have the header file. */ +#undef HAVE_DLFCN_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_INTTYPES_H + +/* Define to 1 if you have the `pthread' library (-lpthread). */ +#undef HAVE_LIBPTHREAD + +/* Define to 1 if you have the `ssl' library (-lssl). */ +#undef HAVE_LIBSSL + +/* Define to 1 if you have the header file. */ +#undef HAVE_MEMORY_H + +/* Define to 1 if you have the `mkdir' function. */ +#undef HAVE_MKDIR + +/* Define to 1 if you have the header file. */ +#undef HAVE_PTHREAD_H + +/* Define to 1 if stdbool.h conforms to C99. */ +#undef HAVE_STDBOOL_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STDINT_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STDLIB_H + +/* Define to 1 if you have the `strerror_r' function. */ +#undef HAVE_STRERROR_R + +/* Define to 1 if you have the header file. */ +#undef HAVE_STRINGS_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_STRING_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_STAT_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_TYPES_H + +/* Define to 1 if you have the `uname' function. */ +#undef HAVE_UNAME + +/* Define to 1 if you have the header file. */ +#undef HAVE_UNISTD_H + +/* Define to 1 if the system has the type `_Bool'. */ +#undef HAVE__BOOL + +/* Name of package */ +#undef PACKAGE + +/* Define to the address where bug reports for this package should be sent. */ +#undef PACKAGE_BUGREPORT + +/* Define to the full name of this package. */ +#undef PACKAGE_NAME + +/* Define to the full name and version of this package. */ +#undef PACKAGE_STRING + +/* Define to the one symbol short name of this package. */ +#undef PACKAGE_TARNAME + +/* Define to the home page for this package. */ +#undef PACKAGE_URL + +/* Define to the version of this package. */ +#undef PACKAGE_VERSION + +/* Define to 1 if you have the ANSI C header files. */ +#undef STDC_HEADERS + +/* Define to 1 if strerror_r returns char *. */ +#undef STRERROR_R_CHAR_P + +/* Enable extensions on AIX 3, Interix. */ +#ifndef _ALL_SOURCE +# undef _ALL_SOURCE +#endif +/* Enable GNU extensions on systems that have them. */ +#ifndef _GNU_SOURCE +# undef _GNU_SOURCE +#endif +/* Enable threading extensions on Solaris. */ +#ifndef _POSIX_PTHREAD_SEMANTICS +# undef _POSIX_PTHREAD_SEMANTICS +#endif +/* Enable extensions on HP NonStop. */ +#ifndef _TANDEM_SOURCE +# undef _TANDEM_SOURCE +#endif +/* Enable general extensions on Solaris. */ +#ifndef __EXTENSIONS__ +# undef __EXTENSIONS__ +#endif + + +/* Version number of package */ +#undef VERSION + +/* Enable large inode numbers on Mac OS X 10.5. */ +#ifndef _DARWIN_USE_64_BIT_INODE +# define _DARWIN_USE_64_BIT_INODE 1 +#endif + +/* Number of bits in a file offset, on hosts where this is settable. */ +#undef _FILE_OFFSET_BITS + +/* Define for large files, on AIX-style hosts. */ +#undef _LARGE_FILES + +/* Define to 1 if on MINIX. */ +#undef _MINIX + +/* Define to 2 if the system does not provide POSIX.1 features except with + this defined. */ +#undef _POSIX_1_SOURCE + +/* Define to 1 if you need to in order for `stat' and other things to work. */ +#undef _POSIX_SOURCE + +/* Define to empty if `const' does not conform to ANSI C. */ +#undef const + +/* Define to `long int' if does not define. */ +#undef off_t + +/* Define to `unsigned int' if does not define. */ +#undef size_t Added: hama/trunk/c++/pipes/install-sh URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/pipes/install-sh?rev=1465852&view=auto ============================================================================== --- hama/trunk/c++/pipes/install-sh (added) +++ hama/trunk/c++/pipes/install-sh Tue Apr 9 01:28:04 2013 @@ -0,0 +1,323 @@ +#!/bin/sh +# install - install a program, script, or datafile + +scriptversion=2005-05-14.22 + +# This originates from X11R5 (mit/util/scripts/install.sh), which was +# later released in X11R6 (xc/config/util/install.sh) with the +# following copyright and license. +# +# Copyright (C) 1994 X Consortium +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# X CONSORTIUM BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN +# AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNEC- +# TION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +# +# Except as contained in this notice, the name of the X Consortium shall not +# be used in advertising or otherwise to promote the sale, use or other deal- +# ings in this Software without prior written authorization from the X Consor- +# tium. +# +# +# FSF changes to this file are in the public domain. +# +# Calling this script install-sh is preferred over install.sh, to prevent +# `make' implicit rules from creating a file called install from it +# when there is no Makefile. +# +# This script is compatible with the BSD install script, but was written +# from scratch. It can only install one file at a time, a restriction +# shared with many OS's install programs. + +# set DOITPROG to echo to test this script + +# Don't use :- since 4.3BSD and earlier shells don't like it. +doit="${DOITPROG-}" + +# put in absolute paths if you don't have them in your path; or use env. vars. + +mvprog="${MVPROG-mv}" +cpprog="${CPPROG-cp}" +chmodprog="${CHMODPROG-chmod}" +chownprog="${CHOWNPROG-chown}" +chgrpprog="${CHGRPPROG-chgrp}" +stripprog="${STRIPPROG-strip}" +rmprog="${RMPROG-rm}" +mkdirprog="${MKDIRPROG-mkdir}" + +chmodcmd="$chmodprog 0755" +chowncmd= +chgrpcmd= +stripcmd= +rmcmd="$rmprog -f" +mvcmd="$mvprog" +src= +dst= +dir_arg= +dstarg= +no_target_directory= + +usage="Usage: $0 [OPTION]... [-T] SRCFILE DSTFILE + or: $0 [OPTION]... SRCFILES... DIRECTORY + or: $0 [OPTION]... -t DIRECTORY SRCFILES... + or: $0 [OPTION]... -d DIRECTORIES... + +In the 1st form, copy SRCFILE to DSTFILE. +In the 2nd and 3rd, copy all SRCFILES to DIRECTORY. +In the 4th, create DIRECTORIES. + +Options: +-c (ignored) +-d create directories instead of installing files. +-g GROUP $chgrpprog installed files to GROUP. +-m MODE $chmodprog installed files to MODE. +-o USER $chownprog installed files to USER. +-s $stripprog installed files. +-t DIRECTORY install into DIRECTORY. +-T report an error if DSTFILE is a directory. +--help display this help and exit. +--version display version info and exit. + +Environment variables override the default commands: + CHGRPPROG CHMODPROG CHOWNPROG CPPROG MKDIRPROG MVPROG RMPROG STRIPPROG +" + +while test -n "$1"; do + case $1 in + -c) shift + continue;; + + -d) dir_arg=true + shift + continue;; + + -g) chgrpcmd="$chgrpprog $2" + shift + shift + continue;; + + --help) echo "$usage"; exit $?;; + + -m) chmodcmd="$chmodprog $2" + shift + shift + continue;; + + -o) chowncmd="$chownprog $2" + shift + shift + continue;; + + -s) stripcmd=$stripprog + shift + continue;; + + -t) dstarg=$2 + shift + shift + continue;; + + -T) no_target_directory=true + shift + continue;; + + --version) echo "$0 $scriptversion"; exit $?;; + + *) # When -d is used, all remaining arguments are directories to create. + # When -t is used, the destination is already specified. + test -n "$dir_arg$dstarg" && break + # Otherwise, the last argument is the destination. Remove it from $@. + for arg + do + if test -n "$dstarg"; then + # $@ is not empty: it contains at least $arg. + set fnord "$@" "$dstarg" + shift # fnord + fi + shift # arg + dstarg=$arg + done + break;; + esac +done + +if test -z "$1"; then + if test -z "$dir_arg"; then + echo "$0: no input file specified." >&2 + exit 1 + fi + # It's OK to call `install-sh -d' without argument. + # This can happen when creating conditional directories. + exit 0 +fi + +for src +do + # Protect names starting with `-'. + case $src in + -*) src=./$src ;; + esac + + if test -n "$dir_arg"; then + dst=$src + src= + + if test -d "$dst"; then + mkdircmd=: + chmodcmd= + else + mkdircmd=$mkdirprog + fi + else + # Waiting for this to be detected by the "$cpprog $src $dsttmp" command + # might cause directories to be created, which would be especially bad + # if $src (and thus $dsttmp) contains '*'. + if test ! -f "$src" && test ! -d "$src"; then + echo "$0: $src does not exist." >&2 + exit 1 + fi + + if test -z "$dstarg"; then + echo "$0: no destination specified." >&2 + exit 1 + fi + + dst=$dstarg + # Protect names starting with `-'. + case $dst in + -*) dst=./$dst ;; + esac + + # If destination is a directory, append the input filename; won't work + # if double slashes aren't ignored. + if test -d "$dst"; then + if test -n "$no_target_directory"; then + echo "$0: $dstarg: Is a directory" >&2 + exit 1 + fi + dst=$dst/`basename "$src"` + fi + fi + + # This sed command emulates the dirname command. + dstdir=`echo "$dst" | sed -e 's,/*$,,;s,[^/]*$,,;s,/*$,,;s,^$,.,'` + + # Make sure that the destination directory exists. + + # Skip lots of stat calls in the usual case. + if test ! -d "$dstdir"; then + defaultIFS=' + ' + IFS="${IFS-$defaultIFS}" + + oIFS=$IFS + # Some sh's can't handle IFS=/ for some reason. + IFS='%' + set x `echo "$dstdir" | sed -e 's@/@%@g' -e 's@^%@/@'` + shift + IFS=$oIFS + + pathcomp= + + while test $# -ne 0 ; do + pathcomp=$pathcomp$1 + shift + if test ! -d "$pathcomp"; then + $mkdirprog "$pathcomp" + # mkdir can fail with a `File exist' error in case several + # install-sh are creating the directory concurrently. This + # is OK. + test -d "$pathcomp" || exit + fi + pathcomp=$pathcomp/ + done + fi + + if test -n "$dir_arg"; then + $doit $mkdircmd "$dst" \ + && { test -z "$chowncmd" || $doit $chowncmd "$dst"; } \ + && { test -z "$chgrpcmd" || $doit $chgrpcmd "$dst"; } \ + && { test -z "$stripcmd" || $doit $stripcmd "$dst"; } \ + && { test -z "$chmodcmd" || $doit $chmodcmd "$dst"; } + + else + dstfile=`basename "$dst"` + + # Make a couple of temp file names in the proper directory. + dsttmp=$dstdir/_inst.$$_ + rmtmp=$dstdir/_rm.$$_ + + # Trap to clean up those temp files at exit. + trap 'ret=$?; rm -f "$dsttmp" "$rmtmp" && exit $ret' 0 + trap '(exit $?); exit' 1 2 13 15 + + # Copy the file name to the temp name. + $doit $cpprog "$src" "$dsttmp" && + + # and set any options; do chmod last to preserve setuid bits. + # + # If any of these fail, we abort the whole thing. If we want to + # ignore errors from any of these, just make sure not to ignore + # errors from the above "$doit $cpprog $src $dsttmp" command. + # + { test -z "$chowncmd" || $doit $chowncmd "$dsttmp"; } \ + && { test -z "$chgrpcmd" || $doit $chgrpcmd "$dsttmp"; } \ + && { test -z "$stripcmd" || $doit $stripcmd "$dsttmp"; } \ + && { test -z "$chmodcmd" || $doit $chmodcmd "$dsttmp"; } && + + # Now rename the file to the real destination. + { $doit $mvcmd -f "$dsttmp" "$dstdir/$dstfile" 2>/dev/null \ + || { + # The rename failed, perhaps because mv can't rename something else + # to itself, or perhaps because mv is so ancient that it does not + # support -f. + + # Now remove or move aside any old file at destination location. + # We try this two ways since rm can't unlink itself on some + # systems and the destination file might be busy for other + # reasons. In this case, the final cleanup might fail but the new + # file should still install successfully. + { + if test -f "$dstdir/$dstfile"; then + $doit $rmcmd -f "$dstdir/$dstfile" 2>/dev/null \ + || $doit $mvcmd -f "$dstdir/$dstfile" "$rmtmp" 2>/dev/null \ + || { + echo "$0: cannot unlink or rename $dstdir/$dstfile" >&2 + (exit 1); exit 1 + } + else + : + fi + } && + + # Now rename the file to the real destination. + $doit $mvcmd "$dsttmp" "$dstdir/$dstfile" + } + } + fi || { (exit 1); exit 1; } +done + +# The final little trick to "correctly" pass the exit status to the exit trap. +{ + (exit 0); exit 0 +} + +# Local variables: +# eval: (add-hook 'write-file-hooks 'time-stamp) +# time-stamp-start: "scriptversion=" +# time-stamp-format: "%:y-%02m-%02d.%02H" +# time-stamp-end: "$" +# End: