giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [25/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:32 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
new file mode 100644
index 0000000..fefe9a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperExt.PathStat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper static methods for working with Writable objects.
+ */
+public class WritableUtils {
+  /**
+   * Don't construct.
+   */
+  private WritableUtils() { }
+
+  /**
+   * Read fields from byteArray to a Writeable object.
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableObject Object to fill in the fields.
+   */
+  public static void readFieldsFromByteArray(
+      byte[] byteArray, Writable writableObject) {
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(byteArray));
+    try {
+      writableObject.readFields(inputStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readFieldsFromByteArray: IOException", e);
+    }
+  }
+
+  /**
+   * Read fields from a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param watch Add a watch?
+   * @param stat Stat of znode if desired.
+   * @param writableObject Object to read into.
+   */
+  public static void readFieldsFromZnode(ZooKeeperExt zkExt,
+                                         String zkPath,
+                                         boolean watch,
+                                         Stat stat,
+                                         Writable writableObject) {
+    try {
+      byte[] zkData = zkExt.getData(zkPath, false, stat);
+      readFieldsFromByteArray(zkData, writableObject);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+        "readFieldsFromZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+        "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Write object to a byte array.
+   *
+   * @param writableObject Object to write from.
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArray(Writable writableObject) {
+    ByteArrayOutputStream outputStream =
+        new ByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(outputStream);
+    try {
+      writableObject.write(output);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "writeToByteArray: IOStateException", e);
+    }
+    return outputStream.toByteArray();
+  }
+
+  /**
+   * Read fields from byteArray to a Writeable object, skipping the size.
+   * Serialization method is choosable
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableObject Object to fill in the fields.
+   * @param unsafe Use unsafe deserialization
+   */
+  public static void readFieldsFromByteArrayWithSize(
+      byte[] byteArray, Writable writableObject, boolean unsafe) {
+    ExtendedDataInput extendedDataInput;
+    if (unsafe) {
+      extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
+    } else {
+      extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
+    }
+    try {
+      extendedDataInput.readInt();
+      writableObject.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readFieldsFromByteArrayWithSize: IOException", e);
+    }
+  }
+
+  /**
+   * Write object to a byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param writableObject Object to write from.
+   * @param unsafe Use unsafe serialization?
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArrayWithSize(Writable writableObject,
+                                                boolean unsafe) {
+    return writeToByteArrayWithSize(writableObject, null, unsafe);
+  }
+
+  /**
+   * Write object to a byte array with the first 4 bytes as the size of the
+   * entire buffer (including the size).
+   *
+   * @param writableObject Object to write from.
+   * @param buffer Use this buffer instead
+   * @param unsafe Use unsafe serialization?
+   * @return Byte array with serialized object.
+   */
+  public static byte[] writeToByteArrayWithSize(Writable writableObject,
+                                                byte[] buffer,
+                                                boolean unsafe) {
+    ExtendedDataOutput extendedDataOutput;
+    if (unsafe) {
+      extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
+    } else {
+      extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
+    }
+    try {
+      extendedDataOutput.writeInt(-1);
+      writableObject.write(extendedDataOutput);
+      extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
+    } catch (IOException e) {
+      throw new IllegalStateException("writeToByteArrayWithSize: " +
+          "IOException", e);
+    }
+
+    return extendedDataOutput.getByteArray();
+  }
+
+  /**
+   * Write object to a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param version Version of the write.
+   * @param writableObject Object to write from.
+   * @return Path and stat information of the znode.
+   */
+  public static PathStat writeToZnode(ZooKeeperExt zkExt,
+                                      String zkPath,
+                                      int version,
+                                      Writable writableObject) {
+    try {
+      byte[] byteArray = writeToByteArray(writableObject);
+      return zkExt.createOrSetExt(zkPath,
+          byteArray,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true,
+          version);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "writeToZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "writeToZnode: InterruptedException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Write list of object to a byte array.
+   *
+   * @param writableList List of object to write from.
+   * @return Byte array with serialized objects.
+   */
+  public static byte[] writeListToByteArray(
+      List<? extends Writable> writableList) {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    DataOutput output = new DataOutputStream(outputStream);
+    try {
+      output.writeInt(writableList.size());
+      for (Writable writable : writableList) {
+        writable.write(output);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "writeListToByteArray: IOException", e);
+    }
+    return outputStream.toByteArray();
+  }
+
+  /**
+   * Write list of objects to a ZooKeeper znode.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param version Version of the write.
+   * @param writableList List of objects to write from.
+   * @return Path and stat information of the znode.
+   */
+  public static PathStat writeListToZnode(
+      ZooKeeperExt zkExt,
+      String zkPath,
+      int version,
+      List<? extends Writable> writableList) {
+    try {
+      return zkExt.createOrSetExt(
+          zkPath,
+          writeListToByteArray(writableList),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true,
+          version);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "writeListToZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "writeListToZnode: InterruptedException on " + zkPath, e);
+    }
+  }
+
+  /**
+   * Read fields from byteArray to a list of objects.
+   *
+   * @param byteArray Byte array to find the fields in.
+   * @param writableClass Class of the objects to instantiate.
+   * @param conf Configuration used for instantiation (i.e Configurable)
+   * @param <T> Object type
+   * @return List of objects.
+   */
+  public static <T extends Writable> List<T> readListFieldsFromByteArray(
+      byte[] byteArray,
+      Class<? extends T> writableClass,
+      Configuration conf) {
+    try {
+      DataInputStream inputStream =
+          new DataInputStream(new ByteArrayInputStream(byteArray));
+      int size = inputStream.readInt();
+      List<T> writableList = new ArrayList<T>(size);
+      for (int i = 0; i < size; ++i) {
+        T writable =
+            ReflectionUtils.newInstance(writableClass, conf);
+        writable.readFields(inputStream);
+        writableList.add(writable);
+      }
+      return writableList;
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: IOException", e);
+    }
+  }
+
+  /**
+   * Read fields from a ZooKeeper znode into a list of objects.
+   *
+   * @param zkExt ZooKeeper instance.
+   * @param zkPath Path of znode.
+   * @param watch Add a watch?
+   * @param stat Stat of znode if desired.
+   * @param writableClass Class of the objects to instantiate.
+   * @param conf Configuration used for instantiation (i.e Configurable)
+   * @param <T> Object type
+   * @return List of objects.
+   */
+  public static <T extends Writable> List<T> readListFieldsFromZnode(
+      ZooKeeperExt zkExt,
+      String zkPath,
+      boolean watch,
+      Stat stat,
+      Class<? extends T> writableClass,
+      Configuration conf) {
+    try {
+      byte[] zkData = zkExt.getData(zkPath, false, stat);
+      return WritableUtils.<T>readListFieldsFromByteArray(zkData,
+          writableClass, conf);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: KeeperException on " + zkPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "readListFieldsFromZnode: InterruptedException on " + zkPath,
+          e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/package-info.java b/giraph-core/src/main/java/org/apache/giraph/utils/package-info.java
new file mode 100644
index 0000000..b7dc437
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of all generic utility classes.
+ */
+package org.apache.giraph.utils;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
new file mode 100644
index 0000000..7d22f9a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/BspEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+/**
+ * Synchronize on waiting for an event to have happened.  This is a one-time
+ * event.
+ */
+public interface BspEvent {
+  /**
+   * Reset the permanent signal.
+   */
+  void reset();
+
+  /**
+   * The event occurred and the occurrence has been logged for future
+   * waiters.
+   */
+  void signal();
+
+  /**
+   * Wait until the event occurred or waiting timed out.
+   * @param msecs Milliseconds to wait until the event occurred. 0 indicates
+   *        check immediately.  -1 indicates wait forever.
+   * @return true if event occurred, false if timed out while waiting
+   */
+  boolean waitMsecs(int msecs);
+
+  /**
+   * Wait indefinitely until the event occurs.
+   */
+  void waitForever();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
new file mode 100644
index 0000000..eb61ea4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/PredicateLock.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * A lock with a predicate that was be used to synchronize events and keep the
+ * job context updated while waiting.
+ */
+public class PredicateLock implements BspEvent {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(PredicateLock.class);
+  /** Default msecs to refresh the progress meter */
+  private static final int DEFAULT_MSEC_PERIOD = 10000;
+  /** Progressable for reporting progress (Job context) */
+  protected final Progressable progressable;
+  /** Actual mses to refresh the progress meter */
+  private final int msecPeriod;
+  /** Lock */
+  private Lock lock = new ReentrantLock();
+  /** Condition associated with lock */
+  private Condition cond = lock.newCondition();
+  /** Predicate */
+  private boolean eventOccurred = false;
+  /** Keeps track of the time */
+  private final Time time;
+
+  /**
+   * Constructor with default values.
+   *
+   * @param progressable used to report progress() (usually a Mapper.Context)
+   */
+  public PredicateLock(Progressable progressable) {
+    this(progressable, DEFAULT_MSEC_PERIOD, SystemTime.get());
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param progressable used to report progress() (usually a Mapper.Context)
+   * @param msecPeriod Msecs between progress reports
+   * @param time Time implementation
+   */
+  public PredicateLock(Progressable progressable, int msecPeriod, Time time) {
+    this.progressable = progressable;
+    this.msecPeriod = msecPeriod;
+    this.time = time;
+  }
+
+  @Override
+  public void reset() {
+    lock.lock();
+    try {
+      eventOccurred = false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void signal() {
+    lock.lock();
+    try {
+      eventOccurred = true;
+      cond.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public boolean waitMsecs(int msecs) {
+    if (msecs < 0) {
+      throw new RuntimeException("waitMsecs: msecs cannot be negative!");
+    }
+    long maxMsecs = time.getMilliseconds() + msecs;
+    int curMsecTimeout = 0;
+    lock.lock();
+    try {
+      while (!eventOccurred) {
+        curMsecTimeout =
+            Math.min(msecs, msecPeriod);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("waitMsecs: Wait for " + curMsecTimeout);
+        }
+        try {
+          boolean signaled =
+              cond.await(curMsecTimeout, TimeUnit.MILLISECONDS);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("waitMsecs: Got timed signaled of " +
+              signaled);
+          }
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(
+            "waitMsecs: Caught interrupted " +
+            "exception on cond.await() " +
+            curMsecTimeout, e);
+        }
+        if (time.getMilliseconds() > maxMsecs) {
+          return false;
+        }
+        msecs = Math.max(0, msecs - curMsecTimeout);
+        progressable.progress(); // go around again
+      }
+    } finally {
+      lock.unlock();
+    }
+    return true;
+  }
+
+  @Override
+  public void waitForever() {
+    while (!waitMsecs(msecPeriod)) {
+      progressable.progress();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
new file mode 100644
index 0000000..4044fed
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * ZooKeeper provides only atomic operations.  ZooKeeperExt provides additional
+ * non-atomic operations that are useful.  It also provides wrappers to
+ * deal with ConnectionLossException.  All methods of this class
+ * should be thread-safe.
+ */
+public class ZooKeeperExt {
+  /** Internal logger */
+  private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
+  /** Length of the ZK sequence number */
+  private static final int SEQUENCE_NUMBER_LENGTH = 10;
+  /** Internal ZooKeeper */
+  private final ZooKeeper zooKeeper;
+  /** Ensure we have progress */
+  private final Progressable progressable;
+  /** Number of max attempts to retry when failing due to connection loss */
+  private final int maxRetryAttempts;
+  /** Milliseconds to wait before trying again due to connection loss */
+  private final int retryWaitMsecs;
+
+  /**
+   * Constructor to connect to ZooKeeper, does not make progress
+   *
+   * @param connectString Comma separated host:port pairs, each corresponding
+   *        to a zk server. e.g.
+   *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
+   *        chroot suffix is used the example would look
+   *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+   *        where the client would be rooted at "/app/a" and all paths
+   *        would be relative to this root - ie getting/setting/etc...
+   *        "/foo/bar" would result in operations being run on
+   *        "/app/a/foo/bar" (from the server perspective).
+   * @param sessionTimeout Session timeout in milliseconds
+   * @param maxRetryAttempts Max retry attempts during connection loss
+   * @param retryWaitMsecs Msecs to wait when retrying due to connection
+   *        loss
+   * @param watcher A watcher object which will be notified of state changes,
+   *        may also be notified for node events
+   * @throws IOException
+   */
+  public ZooKeeperExt(String connectString,
+                      int sessionTimeout,
+                      int maxRetryAttempts,
+                      int retryWaitMsecs,
+                      Watcher watcher) throws IOException {
+    this(connectString, sessionTimeout, maxRetryAttempts,
+        retryWaitMsecs, watcher, null);
+  }
+
+  /**
+   * Constructor to connect to ZooKeeper, make progress
+   *
+   * @param connectString Comma separated host:port pairs, each corresponding
+   *        to a zk server. e.g.
+   *        "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
+   *        chroot suffix is used the example would look
+   *        like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+   *        where the client would be rooted at "/app/a" and all paths
+   *        would be relative to this root - ie getting/setting/etc...
+   *        "/foo/bar" would result in operations being run on
+   *        "/app/a/foo/bar" (from the server perspective).
+   * @param sessionTimeout Session timeout in milliseconds
+   * @param maxRetryAttempts Max retry attempts during connection loss
+   * @param retryWaitMsecs Msecs to wait when retrying due to connection
+   *        loss
+   * @param watcher A watcher object which will be notified of state changes,
+   *        may also be notified for node events
+   * @param progressable Makes progress for longer operations
+   * @throws IOException
+   */
+  public ZooKeeperExt(String connectString,
+      int sessionTimeout,
+      int maxRetryAttempts,
+      int retryWaitMsecs,
+      Watcher watcher,
+      Progressable progressable) throws IOException {
+    this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
+    this.progressable = progressable;
+    this.maxRetryAttempts = maxRetryAttempts;
+    this.retryWaitMsecs = retryWaitMsecs;
+  }
+
+  /**
+   * Provides a possibility of a creating a path consisting of more than one
+   * znode (not atomic).  If recursive is false, operates exactly the
+   * same as create().
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @return Actual created path
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public String createExt(
+      final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive) throws KeeperException, InterruptedException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("createExt: Creating path " + path);
+    }
+
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        if (!recursive) {
+          return zooKeeper.create(path, data, acl, createMode);
+        }
+
+        try {
+          return zooKeeper.create(path, data, acl, createMode);
+        } catch (KeeperException.NoNodeException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("createExt: Cannot directly create node " + path);
+          }
+        }
+
+        int pos = path.indexOf("/", 1);
+        for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
+          try {
+            if (progressable != null) {
+              progressable.progress();
+            }
+            zooKeeper.create(
+                path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
+          } catch (KeeperException.NodeExistsException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("createExt: Znode " + path.substring(0, pos) +
+                  " already exists");
+            }
+          }
+        }
+        return zooKeeper.create(path, data, acl, createMode);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
+            "waiting " + retryWaitMsecs + " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("createExt: Failed to create " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Data structure for handling the output of createOrSet()
+   */
+  public static class PathStat {
+    /** Path to created znode (if any) */
+    private String path;
+    /** Stat from set znode (if any) */
+    private Stat stat;
+
+    /**
+     * Put in results from createOrSet()
+     *
+     * @param path Path to created znode (or null)
+     * @param stat Stat from set znode (if set)
+     */
+    public PathStat(String path, Stat stat) {
+      this.path = path;
+      this.stat = stat;
+    }
+
+    /**
+     * Get the path of the created znode if it was created.
+     *
+     * @return Path of created znode or null if not created
+     */
+    public String getPath() {
+      return path;
+    }
+
+    /**
+     * Get the stat of the set znode if set
+     *
+     * @return Stat of set znode or null if not set
+     */
+    public Stat getStat() {
+      return stat;
+    }
+  }
+
+  /**
+   * Create a znode.  Set the znode if the created znode already exists.
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @param version Version to set if setting
+   * @return Path of created znode or Stat of set znode
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public PathStat createOrSetExt(final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive,
+      int version) throws KeeperException, InterruptedException {
+    String createdPath = null;
+    Stat setStat = null;
+    try {
+      createdPath = createExt(path, data, acl, createMode, recursive);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("createOrSet: Node exists on path " + path);
+      }
+      setStat = zooKeeper.setData(path, data, version);
+    }
+    return new PathStat(createdPath, setStat);
+  }
+
+  /**
+   * Create a znode if there is no other znode there
+   *
+   * @param path path to create
+   * @param data data to set on the final znode
+   * @param acl acls on each znode created
+   * @param createMode only affects the final znode
+   * @param recursive if true, creates all ancestors
+   * @return Path of created znode or Stat of set znode
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public PathStat createOnceExt(final String path,
+      byte[] data,
+      List<ACL> acl,
+      CreateMode createMode,
+      boolean recursive) throws KeeperException, InterruptedException {
+    String createdPath = null;
+    Stat setStat = null;
+    try {
+      createdPath = createExt(path, data, acl, createMode, recursive);
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("createOnceExt: Node already exists on path " + path);
+      }
+    }
+    return new PathStat(createdPath, setStat);
+  }
+
+  /**
+   * Delete a path recursively.  When the deletion is recursive, it is a
+   * non-atomic operation, hence, not part of ZooKeeper.
+   * @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
+   * @param version expected version (-1 for all)
+   * @param recursive if true, remove all children, otherwise behave like
+   *        remove()
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void deleteExt(final String path, int version, boolean recursive)
+    throws InterruptedException, KeeperException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        if (!recursive) {
+          zooKeeper.delete(path, version);
+          return;
+        }
+
+        try {
+          zooKeeper.delete(path, version);
+          return;
+        } catch (KeeperException.NotEmptyException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("deleteExt: Cannot directly remove node " + path);
+          }
+        }
+
+        List<String> childList = zooKeeper.getChildren(path, false);
+        for (String child : childList) {
+          if (progressable != null) {
+            progressable.progress();
+          }
+          deleteExt(path + "/" + child, -1, true);
+        }
+
+        zooKeeper.delete(path, version);
+        return;
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("deleteExt: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("deleteExt: Failed to delete " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is true and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path. The watch will be
+   * triggered by a successful operation that creates/delete the node or sets
+   * the data on the node.
+   *
+   * @param path
+   *                the node path
+   * @param watch
+   *                whether need to watch this node
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   */
+  public Stat exists(String path, boolean watch) throws KeeperException,
+      InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.exists(path, watch);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("exists: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("exists: Failed to check " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The
+   * watch will be triggered by a successful operation that
+   * creates/delete the node or sets the data on the node.
+   *
+   * @param path the node path
+   * @param watcher explicit watcher
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public Stat exists(final String path, Watcher watcher)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.exists(path, watcher);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("exists: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("exists: Failed to check " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Return the data and the stat of the node of the given path.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The watch
+   * will be triggered by a successful operation that sets data on the node, or
+   * deletes the node.
+   * <p>
+   * A KeeperException with error code KeeperException.NoNode will be thrown
+   * if no node with the given path exists.
+   *
+   * @param path the given path
+   * @param watcher explicit watcher
+   * @param stat the stat of the node
+   * @return the data of the node
+   * @throws KeeperException If the server signals an error with a non-zero
+   *         error code
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public byte[] getData(final String path, Watcher watcher, Stat stat)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.getData(path, watcher, stat);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getData: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("getData: Failed to get " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Return the data and the stat of the node of the given path.
+   * <p>
+   * If the watch is true and the call is successful (no exception is
+   * thrown), a watch will be left on the node with the given path. The watch
+   * will be triggered by a successful operation that sets data on the node, or
+   * deletes the node.
+   * <p>
+   * A KeeperException with error code KeeperException.NoNode will be thrown
+   * if no node with the given path exists.
+   *
+   * @param path the given path
+   * @param watch whether need to watch this node
+   * @param stat the stat of the node
+   * @return the data of the node
+   * @throws KeeperException If the server signals an error with a non-zero
+   *         error code
+   * @throws InterruptedException If the server transaction is interrupted.
+   */
+  public byte[] getData(String path, boolean watch, Stat stat)
+    throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        return zooKeeper.getData(path, watch, stat);
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getData: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("getData: Failed to get " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Get the children of the path with extensions.
+   * Extension 1: Sort the children based on sequence number
+   * Extension 2: Get the full path instead of relative path
+   *
+   * @param path path to znode
+   * @param watch set the watch?
+   * @param sequenceSorted sort by the sequence number
+   * @param fullPath if true, get the fully znode path back
+   * @return list of children
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public List<String> getChildrenExt(
+      final String path,
+      boolean watch,
+      boolean sequenceSorted,
+      boolean fullPath) throws KeeperException, InterruptedException {
+    int attempt = 0;
+    while (attempt < maxRetryAttempts) {
+      try {
+        List<String> childList = zooKeeper.getChildren(path, watch);
+        /* Sort children according to the sequence number, if desired */
+        if (sequenceSorted) {
+          Collections.sort(childList, new Comparator<String>() {
+            public int compare(String s1, String s2) {
+              if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
+                  (s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
+                throw new RuntimeException(
+                    "getChildrenExt: Invalid length for sequence " +
+                        " sorting > " +
+                        SEQUENCE_NUMBER_LENGTH +
+                        " for s1 (" +
+                        s1.length() + ") or s2 (" + s2.length() + ")");
+              }
+              int s1sequenceNumber = Integer.parseInt(
+                  s1.substring(s1.length() -
+                      SEQUENCE_NUMBER_LENGTH));
+              int s2sequenceNumber = Integer.parseInt(
+                  s2.substring(s2.length() -
+                      SEQUENCE_NUMBER_LENGTH));
+              return s1sequenceNumber - s2sequenceNumber;
+            }
+          });
+        }
+        if (fullPath) {
+          List<String> fullChildList = new ArrayList<String>();
+          for (String child : childList) {
+            fullChildList.add(path + "/" + child);
+          }
+          return fullChildList;
+        }
+        return childList;
+      } catch (KeeperException.ConnectionLossException e) {
+        LOG.warn("getChildrenExt: Connection loss on attempt " +
+            attempt + ", waiting " + retryWaitMsecs +
+            " msecs before retrying.", e);
+      }
+      ++attempt;
+      Thread.sleep(retryWaitMsecs);
+    }
+    throw new IllegalStateException("createExt: Failed to create " + path  +
+        " after " + attempt + " tries!");
+  }
+
+  /**
+   * Close this client object. Once the client is closed, its session becomes
+   * invalid. All the ephemeral nodes in the ZooKeeper server associated with
+   * the session will be removed. The watches left on those nodes (and on
+   * their parents) will be triggered.
+   *
+   * @throws InterruptedException
+   */
+  public void close() throws InterruptedException {
+    zooKeeper.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
new file mode 100644
index 0000000..611a4bb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.zk;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.giraph.conf.GiraphConstants.BASE_ZNODE_KEY;
+
+
+/**
+ * Manages the election of ZooKeeper servers, starting/stopping the services,
+ * etc.
+ */
+public class ZooKeeperManager {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
+  /** Separates the hostname and task in the candidate stamp */
+  private static final String HOSTNAME_TASK_SEPARATOR = " ";
+  /** The ZooKeeperString filename prefix */
+  private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX =
+      "zkServerList_";
+  /** Denotes that the computation is done for a partition */
+  private static final String COMPUTATION_DONE_SUFFIX = ".COMPUTATION_DONE";
+  /** Job context (mainly for progress) */
+  private Mapper<?, ?, ?, ?>.Context context;
+  /** Hadoop configuration */
+  private final ImmutableClassesGiraphConfiguration conf;
+  /** Task partition, to ensure uniqueness */
+  private final int taskPartition;
+  /** HDFS base directory for all file-based coordination */
+  private final Path baseDirectory;
+  /**
+   * HDFS task ZooKeeper candidate/completed
+   * directory for all file-based coordination
+   */
+  private final Path taskDirectory;
+  /**
+   * HDFS ZooKeeper server ready/done directory
+   * for all file-based coordination
+   */
+  private final Path serverDirectory;
+  /** HDFS path to whether the task is done */
+  private final Path myClosedPath;
+  /** Polling msecs timeout */
+  private final int pollMsecs;
+  /** Server count */
+  private final int serverCount;
+  /** File system */
+  private final FileSystem fs;
+  /** ZooKeeper process */
+  private Process zkProcess = null;
+  /** Thread that gets the zkProcess output */
+  private StreamCollector zkProcessCollector = null;
+  /** ZooKeeper local file system directory */
+  private final String zkDir;
+  /** ZooKeeper config file path */
+  private final String configFilePath;
+  /** ZooKeeper server list */
+  private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
+  /** ZooKeeper base port */
+  private final int zkBasePort;
+  /** Final ZooKeeper server port list (for clients) */
+  private String zkServerPortString;
+  /** My hostname */
+  private String myHostname = null;
+  /** Job id, to ensure uniqueness */
+  private final String jobId;
+  /**
+   * Default local ZooKeeper prefix directory to use (where ZooKeeper server
+   * files will go)
+   */
+  private final String zkDirDefault;
+
+  /** State of the application */
+  public enum State {
+    /** Failure occurred */
+    FAILED,
+    /** Application finished */
+    FINISHED
+  }
+
+  /**
+   * Constructor with context.
+   *
+   * @param context Context to be stored internally
+   * @param configuration Configuration
+   * @throws IOException
+   */
+  public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
+                          ImmutableClassesGiraphConfiguration configuration)
+    throws IOException {
+    this.context = context;
+    this.conf = configuration;
+    taskPartition = conf.getTaskPartition();
+    jobId = conf.get("mapred.job.id", "Unknown Job");
+    baseDirectory =
+        new Path(conf.get(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+            getFinalZooKeeperPath()));
+    taskDirectory = new Path(baseDirectory,
+        "_task");
+    serverDirectory = new Path(baseDirectory,
+        "_zkServer");
+    myClosedPath = new Path(taskDirectory,
+        Integer.toString(taskPartition) +
+        COMPUTATION_DONE_SUFFIX);
+    pollMsecs = conf.getInt(
+        GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS,
+        GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT);
+    serverCount = conf.getInt(
+        GiraphConstants.ZOOKEEPER_SERVER_COUNT,
+        GiraphConstants.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+    String jobLocalDir = conf.get("job.local.dir");
+    if (jobLocalDir != null) { // for non-local jobs
+      zkDirDefault = jobLocalDir +
+          "/_bspZooKeeper";
+    } else {
+      zkDirDefault = System.getProperty("user.dir") + "/" +
+              GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT;
+    }
+    zkDir = conf.get(GiraphConstants.ZOOKEEPER_DIR, zkDirDefault);
+    configFilePath = zkDir + "/zoo.cfg";
+    zkBasePort = conf.getInt(
+        GiraphConstants.ZOOKEEPER_SERVER_PORT,
+        GiraphConstants.ZOOKEEPER_SERVER_PORT_DEFAULT);
+
+
+    myHostname = InetAddress.getLocalHost().getCanonicalHostName();
+    fs = FileSystem.get(conf);
+  }
+
+  /**
+   * Generate the final ZooKeeper coordination directory on HDFS
+   *
+   * @return directory path with job id
+   */
+  private String getFinalZooKeeperPath() {
+    return GiraphConstants.ZOOKEEPER_MANAGER_DIR_DEFAULT + "/" + jobId;
+  }
+
+  /**
+   * Return the base ZooKeeper ZNode from which all other ZNodes Giraph creates
+   * should be sited, for instance in a multi-tenant ZooKeeper, the znode
+   * reserved for Giraph
+   *
+   * @param conf  Necessary to access user-provided values
+   * @return  String of path without trailing slash
+   */
+  public static String getBasePath(Configuration conf) {
+    String result = conf.get(BASE_ZNODE_KEY, "");
+    if (!result.equals("") && !result.startsWith("/")) {
+      throw new IllegalArgumentException("Value for " +
+          BASE_ZNODE_KEY + " must start with /: " + result);
+    }
+
+    return result;
+  }
+
+  /**
+   * Collects the output of a stream and dumps it to the log.
+   */
+  private static class StreamCollector extends Thread {
+    /** Number of last lines to keep */
+    private static final int LAST_LINES_COUNT = 100;
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(StreamCollector.class);
+    /** Buffered reader of input stream */
+    private final BufferedReader bufferedReader;
+    /** Last lines (help to debug failures) */
+    private final LinkedList<String> lastLines = Lists.newLinkedList();
+    /**
+     * Constructor.
+     *
+     * @param is InputStream to dump to LOG.info
+     */
+    public StreamCollector(final InputStream is) {
+      super(StreamCollector.class.getName());
+      setDaemon(true);
+      InputStreamReader streamReader = new InputStreamReader(is);
+      bufferedReader = new BufferedReader(streamReader);
+    }
+
+    @Override
+    public void run() {
+      readLines();
+    }
+
+    /**
+     * Read all the lines from the bufferedReader.
+     */
+    private synchronized void readLines() {
+      String line;
+      try {
+        while ((line = bufferedReader.readLine()) != null) {
+          if (lastLines.size() > LAST_LINES_COUNT) {
+            lastLines.removeFirst();
+          }
+          lastLines.add(line);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("readLines: " + line);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("readLines: Ignoring IOException", e);
+      }
+    }
+
+    /**
+     * Dump the last n lines of the collector.  Likely used in
+     * the case of failure.
+     *
+     * @param level Log level to dump with
+     */
+    public synchronized void dumpLastLines(Level level) {
+      // Get any remaining lines
+      readLines();
+      // Dump the lines to the screen
+      for (String line : lastLines) {
+        LOG.log(level, line);
+      }
+    }
+  }
+
+  /**
+   * Create the candidate stamps and decide on the servers to start if
+   * you are partition 0.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void setup() throws IOException, InterruptedException {
+    createCandidateStamp();
+    getZooKeeperServerList();
+  }
+
+  /**
+   * Create a HDFS stamp for this task.  If another task already
+   * created it, then this one will fail, which is fine.
+   */
+  public void createCandidateStamp() {
+    try {
+      fs.mkdirs(baseDirectory);
+      LOG.info("createCandidateStamp: Made the directory " +
+          baseDirectory);
+    } catch (IOException e) {
+      LOG.error("createCandidateStamp: Failed to mkdirs " +
+          baseDirectory);
+    }
+    // Check that the base directory exists and is a directory
+    try {
+      if (!fs.getFileStatus(baseDirectory).isDir()) {
+        throw new IllegalArgumentException(
+            "createCandidateStamp: " + baseDirectory +
+            " is not a directory, but should be.");
+      }
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "createCandidateStamp: Couldn't get file status " +
+              "for base directory " + baseDirectory + ".  If there is an " +
+              "issue with this directory, please set an accesible " +
+              "base directory with the Hadoop configuration option " +
+              GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY);
+    }
+
+    Path myCandidacyPath = new Path(
+        taskDirectory, myHostname +
+        HOSTNAME_TASK_SEPARATOR + taskPartition);
+    try {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("createCandidateStamp: Creating my filestamp " +
+            myCandidacyPath);
+      }
+      fs.createNewFile(myCandidacyPath);
+    } catch (IOException e) {
+      LOG.error("createCandidateStamp: Failed (maybe previous task " +
+          "failed) to create filestamp " + myCandidacyPath, e);
+    }
+  }
+
+  /**
+   * Every task must create a stamp to let the ZooKeeper servers know that
+   * they can shutdown.  This also lets the task know that it was already
+   * completed.
+   */
+  private void createZooKeeperClosedStamp() {
+    try {
+      LOG.info("createZooKeeperClosedStamp: Creating my filestamp " +
+          myClosedPath);
+      fs.createNewFile(myClosedPath);
+    } catch (IOException e) {
+      LOG.error("createZooKeeperClosedStamp: Failed (maybe previous task " +
+          "failed) to create filestamp " + myClosedPath);
+    }
+  }
+
+  /**
+   * Check if all the computation is done.
+   * @return true if all computation is done.
+   */
+  public boolean computationDone() {
+    try {
+      return fs.exists(myClosedPath);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Task 0 will call this to create the ZooKeeper server list.  The result is
+   * a file that describes the ZooKeeper servers through the filename.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void createZooKeeperServerList() throws IOException,
+      InterruptedException {
+    int candidateRetrievalAttempt = 0;
+    Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
+    while (true) {
+      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
+      hostnameTaskMap.clear();
+      if (fileStatusArray.length > 0) {
+        for (FileStatus fileStatus : fileStatusArray) {
+          String[] hostnameTaskArray =
+              fileStatus.getPath().getName().split(
+                  HOSTNAME_TASK_SEPARATOR);
+          if (hostnameTaskArray.length != 2) {
+            throw new RuntimeException(
+                "getZooKeeperServerList: Task 0 failed " +
+                    "to parse " +
+                    fileStatus.getPath().getName());
+          }
+          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
+            hostnameTaskMap.put(hostnameTaskArray[0],
+                new Integer(hostnameTaskArray[1]));
+          }
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("getZooKeeperServerList: Got " +
+              hostnameTaskMap.keySet() + " " +
+              hostnameTaskMap.size() + " hosts from " +
+              fileStatusArray.length + " candidates when " +
+              serverCount + " required (polling period is " +
+              pollMsecs + ") on attempt " +
+              candidateRetrievalAttempt);
+        }
+
+        if (hostnameTaskMap.size() >= serverCount) {
+          break;
+        }
+        ++candidateRetrievalAttempt;
+        Thread.sleep(pollMsecs);
+      }
+    }
+    StringBuffer serverListFile =
+        new StringBuffer(ZOOKEEPER_SERVER_LIST_FILE_PREFIX);
+    int numServers = 0;
+    for (Map.Entry<String, Integer> hostnameTask :
+      hostnameTaskMap.entrySet()) {
+      serverListFile.append(hostnameTask.getKey() +
+          HOSTNAME_TASK_SEPARATOR + hostnameTask.getValue() +
+          HOSTNAME_TASK_SEPARATOR);
+      if (++numServers == serverCount) {
+        break;
+      }
+    }
+    Path serverListPath =
+        new Path(baseDirectory, serverListFile.toString());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("createZooKeeperServerList: Creating the final " +
+          "ZooKeeper file '" + serverListPath + "'");
+    }
+    fs.createNewFile(serverListPath);
+  }
+
+  /**
+   * Make an attempt to get the server list file by looking for a file in
+   * the appropriate directory with the prefix
+   * ZOOKEEPER_SERVER_LIST_FILE_PREFIX.
+   * @return null if not found or the filename if found
+   * @throws IOException
+   */
+  private String getServerListFile() throws IOException {
+    String serverListFile = null;
+    FileStatus [] fileStatusArray = fs.listStatus(baseDirectory);
+    for (FileStatus fileStatus : fileStatusArray) {
+      if (fileStatus.getPath().getName().startsWith(
+          ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
+        serverListFile = fileStatus.getPath().getName();
+        break;
+      }
+    }
+    return serverListFile;
+  }
+
+  /**
+   * Task 0 is the designated master and will generate the server list
+   * (unless it has already done so).  Other
+   * tasks will consume the file after it is created (just the filename).
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void getZooKeeperServerList() throws IOException,
+      InterruptedException {
+    String serverListFile;
+
+    if (taskPartition == 0) {
+      serverListFile = getServerListFile();
+      if (serverListFile == null) {
+        createZooKeeperServerList();
+      }
+    }
+
+    while (true) {
+      serverListFile = getServerListFile();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getZooKeeperServerList: For task " + taskPartition +
+            ", got file '" + serverListFile +
+            "' (polling period is " +
+            pollMsecs + ")");
+      }
+      if (serverListFile != null) {
+        break;
+      }
+      try {
+        Thread.sleep(pollMsecs);
+      } catch (InterruptedException e) {
+        LOG.warn("getZooKeeperServerList: Strange interrupted " +
+            "exception " + e.getMessage());
+      }
+
+    }
+
+    List<String> serverHostList = Arrays.asList(serverListFile.substring(
+        ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(
+            HOSTNAME_TASK_SEPARATOR));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getZooKeeperServerList: Found " + serverHostList + " " +
+          serverHostList.size() +
+          " hosts in filename '" + serverListFile + "'");
+    }
+    if (serverHostList.size() != serverCount * 2) {
+      throw new IllegalStateException(
+          "getZooKeeperServerList: Impossible " +
+              " that " + serverHostList.size() +
+              " != 2 * " +
+              serverCount + " asked for.");
+    }
+
+    for (int i = 0; i < serverHostList.size(); i += 2) {
+      zkServerPortMap.put(serverHostList.get(i),
+        Integer.parseInt(serverHostList.get(i + 1)));
+    }
+    zkServerPortString = "";
+    for (String server : zkServerPortMap.keySet()) {
+      if (zkServerPortString.length() > 0) {
+        zkServerPortString += ",";
+      }
+      zkServerPortString += server + ":" + zkBasePort;
+    }
+  }
+
+  /**
+   * Users can get the server port string to connect to ZooKeeper
+   * @return server port string - comma separated
+   */
+  public String getZooKeeperServerPortString() {
+    return zkServerPortString;
+  }
+
+  /**
+   * Whoever is elected to be a ZooKeeper server must generate a config file
+   * locally.
+   *
+   * @param serverList List of ZooKeeper servers.
+   */
+  private void generateZooKeeperConfigFile(List<String> serverList) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("generateZooKeeperConfigFile: Creating file " +
+          configFilePath + " in " + zkDir + " with base port " +
+          zkBasePort);
+    }
+    try {
+      File zkDirFile = new File(this.zkDir);
+      boolean mkDirRet = zkDirFile.mkdirs();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateZooKeeperConfigFile: Make directory of " +
+            zkDirFile.getName() + " = " + mkDirRet);
+      }
+      File configFile = new File(configFilePath);
+      boolean deletedRet = configFile.delete();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("generateZooKeeperConfigFile: Delete of " +
+            configFile.getName() + " = " + deletedRet);
+      }
+      if (!configFile.createNewFile()) {
+        throw new IllegalStateException(
+            "generateZooKeeperConfigFile: Failed to " +
+                "create config file " + configFile.getName());
+      }
+      // Make writable by everybody
+      if (!configFile.setWritable(true, false)) {
+        throw new IllegalStateException(
+            "generateZooKeeperConfigFile: Failed to make writable " +
+                configFile.getName());
+      }
+
+      Writer writer = null;
+      try {
+        writer = new FileWriter(configFilePath);
+        writer.write("tickTime=" +
+            GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME + "\n");
+        writer.write("dataDir=" + this.zkDir + "\n");
+        writer.write("clientPort=" + zkBasePort + "\n");
+        writer.write("maxClientCnxns=" +
+            GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
+            "\n");
+        writer.write("minSessionTimeout=" +
+            conf.getZooKeeperMinSessionTimeout() + "\n");
+        writer.write("maxSessionTimeout=" +
+            conf.getZooKeeperMaxSessionTimeout() + "\n");
+        writer.write("initLimit=" +
+            GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
+        writer.write("syncLimit=" +
+            GiraphConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
+        writer.write("snapCount=" +
+            GiraphConstants.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
+        writer.write("forceSync=" + conf.getZooKeeperForceSync() + "\n");
+        writer.write("skipACL=" + conf.getZooKeeperSkipAcl() + "\n");
+        if (serverList.size() != 1) {
+          writer.write("electionAlg=0\n");
+          for (int i = 0; i < serverList.size(); ++i) {
+            writer.write("server." + i + "=" + serverList.get(i) +
+                ":" + (zkBasePort + 1) +
+                ":" + (zkBasePort + 2) + "\n");
+            if (myHostname.equals(serverList.get(i))) {
+              Writer myidWriter = null;
+              try {
+                myidWriter = new FileWriter(zkDir + "/myid");
+                myidWriter.write(i + "\n");
+              } finally {
+                Closeables.closeQuietly(myidWriter);
+              }
+            }
+          }
+        }
+      } finally {
+        Closeables.closeQuietly(writer);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "generateZooKeeperConfigFile: Failed to write file", e);
+    }
+  }
+
+  /**
+   * If this task has been selected, online a ZooKeeper server.  Otherwise,
+   * wait until this task knows that the ZooKeeper servers have been onlined.
+   */
+  public void onlineZooKeeperServers() {
+    Integer taskId = zkServerPortMap.get(myHostname);
+    if ((taskId != null) && (taskId.intValue() == taskPartition)) {
+      File zkDirFile = new File(this.zkDir);
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("onlineZooKeeperServers: Trying to delete old " +
+              "directory " + this.zkDir);
+        }
+        FileUtils.deleteDirectory(zkDirFile);
+      } catch (IOException e) {
+        LOG.warn("onlineZooKeeperServers: Failed to delete " +
+            "directory " + this.zkDir, e);
+      }
+      generateZooKeeperConfigFile(
+          new ArrayList<String>(zkServerPortMap.keySet()));
+      ProcessBuilder processBuilder = new ProcessBuilder();
+      List<String> commandList = Lists.newArrayList();
+      String javaHome = System.getProperty("java.home");
+      if (javaHome == null) {
+        throw new IllegalArgumentException(
+            "onlineZooKeeperServers: java.home is not set!");
+      }
+      commandList.add(javaHome + "/bin/java");
+      String zkJavaOptsString =
+          conf.get(GiraphConstants.ZOOKEEPER_JAVA_OPTS,
+              GiraphConstants.ZOOKEEPER_JAVA_OPTS_DEFAULT);
+      String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
+      if (zkJavaOptsArray != null) {
+        commandList.addAll(Arrays.asList(zkJavaOptsArray));
+      }
+      commandList.add("-cp");
+      Path fullJarPath = new Path(conf.get(GiraphConstants.ZOOKEEPER_JAR));
+      commandList.add(fullJarPath.toString());
+      commandList.add(QuorumPeerMain.class.getName());
+      commandList.add(configFilePath);
+      processBuilder.command(commandList);
+      File execDirectory = new File(zkDir);
+      processBuilder.directory(execDirectory);
+      processBuilder.redirectErrorStream(true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("onlineZooKeeperServers: Attempting to " +
+            "start ZooKeeper server with command " + commandList +
+            " in directory " + execDirectory.toString());
+      }
+      try {
+        synchronized (this) {
+          zkProcess = processBuilder.start();
+          zkProcessCollector =
+              new StreamCollector(zkProcess.getInputStream());
+          zkProcessCollector.start();
+        }
+        Runnable runnable = new Runnable() {
+          public void run() {
+            LOG.info("run: Shutdown hook started.");
+            synchronized (this) {
+              if (zkProcess != null) {
+                LOG.warn("onlineZooKeeperServers: " +
+                         "Forced a shutdown hook kill of the " +
+                         "ZooKeeper process.");
+                zkProcess.destroy();
+                int exitCode = -1;
+                try {
+                  exitCode = zkProcess.waitFor();
+                } catch (InterruptedException e) {
+                  LOG.warn("run: Couldn't get exit code.");
+                }
+                LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
+                    "with " + exitCode + " (note that 143 " +
+                    "typically means killed).");
+              }
+            }
+          }
+        };
+        Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        LOG.info("onlineZooKeeperServers: Shutdown hook added.");
+      } catch (IOException e) {
+        LOG.error("onlineZooKeeperServers: Failed to start " +
+            "ZooKeeper process", e);
+        throw new RuntimeException(e);
+      }
+
+      // Once the server is up and running, notify that this server is up
+      // and running by dropping a ready stamp.
+      int connectAttempts = 0;
+      final int maxConnectAttempts =
+          conf.getZookeeperConnectionAttempts();
+      while (connectAttempts < maxConnectAttempts) {
+        try {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("onlineZooKeeperServers: Connect attempt " +
+                connectAttempts + " of " +
+                maxConnectAttempts +
+                " max trying to connect to " +
+                myHostname + ":" + zkBasePort +
+                " with poll msecs = " + pollMsecs);
+          }
+          InetSocketAddress zkServerAddress =
+              new InetSocketAddress(myHostname, zkBasePort);
+          Socket testServerSock = new Socket();
+          testServerSock.connect(zkServerAddress, 5000);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("onlineZooKeeperServers: Connected to " +
+                zkServerAddress + "!");
+          }
+          break;
+        } catch (SocketTimeoutException e) {
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "SocketTimeoutException", e);
+        } catch (ConnectException e) {
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "ConnectException", e);
+        } catch (IOException e) {
+          LOG.warn("onlineZooKeeperServers: Got " +
+              "IOException", e);
+        }
+
+        ++connectAttempts;
+        try {
+          Thread.sleep(pollMsecs);
+        } catch (InterruptedException e) {
+          LOG.warn("onlineZooKeeperServers: Sleep of " + pollMsecs +
+              " interrupted - " + e.getMessage());
+        }
+      }
+      if (connectAttempts == maxConnectAttempts) {
+        throw new IllegalStateException(
+            "onlineZooKeeperServers: Failed to connect in " +
+                connectAttempts + " tries!");
+      }
+      Path myReadyPath = new Path(
+          serverDirectory, myHostname +
+          HOSTNAME_TASK_SEPARATOR + taskPartition);
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("onlineZooKeeperServers: Creating my filestamp " +
+              myReadyPath);
+        }
+        fs.createNewFile(myReadyPath);
+      } catch (IOException e) {
+        LOG.error("onlineZooKeeperServers: Failed (maybe previous " +
+            "task failed) to create filestamp " + myReadyPath, e);
+      }
+    } else {
+      List<String> foundList = new ArrayList<String>();
+      int readyRetrievalAttempt = 0;
+      while (true) {
+        try {
+          FileStatus [] fileStatusArray =
+              fs.listStatus(serverDirectory);
+          foundList.clear();
+          if ((fileStatusArray != null) &&
+              (fileStatusArray.length > 0)) {
+            for (int i = 0; i < fileStatusArray.length; ++i) {
+              String[] hostnameTaskArray =
+                  fileStatusArray[i].getPath().getName().split(
+                      HOSTNAME_TASK_SEPARATOR);
+              if (hostnameTaskArray.length != 2) {
+                throw new RuntimeException(
+                    "getZooKeeperServerList: Task 0 failed " +
+                        "to parse " +
+                        fileStatusArray[i].getPath().getName());
+              }
+              foundList.add(hostnameTaskArray[0]);
+            }
+            if (LOG.isInfoEnabled()) {
+              LOG.info("onlineZooKeeperServers: Got " +
+                  foundList + " " +
+                  foundList.size() + " hosts from " +
+                  fileStatusArray.length +
+                  " ready servers when " +
+                  serverCount +
+                  " required (polling period is " +
+                  pollMsecs + ") on attempt " +
+                  readyRetrievalAttempt);
+            }
+            if (foundList.containsAll(zkServerPortMap.keySet())) {
+              break;
+            }
+          } else {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("onlineZooKeeperSErvers: Empty " +
+                  "directory " + serverDirectory +
+                  ", waiting " + pollMsecs + " msecs.");
+            }
+          }
+          Thread.sleep(pollMsecs);
+          ++readyRetrievalAttempt;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          LOG.warn("onlineZooKeeperServers: Strange interrupt from " +
+              e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Wait for all map tasks to signal completion.
+   *
+   * @param totalMapTasks Number of map tasks to wait for
+   */
+  private void waitUntilAllTasksDone(int totalMapTasks) {
+    int attempt = 0;
+    while (true) {
+      try {
+        FileStatus [] fileStatusArray =
+            fs.listStatus(taskDirectory);
+        int totalDone = 0;
+        if (fileStatusArray.length > 0) {
+          for (int i = 0; i < fileStatusArray.length; ++i) {
+            if (fileStatusArray[i].getPath().getName().endsWith(
+                COMPUTATION_DONE_SUFFIX)) {
+              ++totalDone;
+            }
+          }
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("waitUntilAllTasksDone: Got " + totalDone +
+              " and " + totalMapTasks +
+              " desired (polling period is " +
+              pollMsecs + ") on attempt " +
+              attempt);
+        }
+        if (totalDone >= totalMapTasks) {
+          break;
+        }
+        ++attempt;
+        Thread.sleep(pollMsecs);
+        context.progress();
+      } catch (IOException e) {
+        LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e);
+      }
+    }
+  }
+
+  /**
+   * Notify the ZooKeeper servers that this partition is done with all
+   * ZooKeeper communication.  If this task is running a ZooKeeper server,
+   * kill it when all partitions are done and wait for
+   * completion.  Clean up the ZooKeeper local directory as well.
+   *
+   * @param state State of the application
+   */
+  public void offlineZooKeeperServers(State state) {
+    if (state == State.FINISHED) {
+      createZooKeeperClosedStamp();
+    }
+    synchronized (this) {
+      if (zkProcess != null) {
+        int totalMapTasks = conf.getMapTasks();
+        waitUntilAllTasksDone(totalMapTasks);
+        zkProcess.destroy();
+        int exitValue = -1;
+        File zkDirFile;
+        try {
+          zkProcessCollector.join();
+          exitValue = zkProcess.waitFor();
+          zkDirFile = new File(zkDir);
+          FileUtils.deleteDirectory(zkDirFile);
+        } catch (InterruptedException e) {
+          LOG.warn("offlineZooKeeperServers: " +
+              "InterruptedException, but continuing ",
+              e);
+        } catch (IOException e) {
+          LOG.warn("offlineZooKeeperSevers: " +
+              "IOException, but continuing",
+              e);
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("offlineZooKeeperServers: waitFor returned " +
+              exitValue + " and deleted directory " + zkDir);
+        }
+        zkProcess = null;
+      }
+    }
+  }
+
+  /**
+   *  Is this task running a ZooKeeper server?  Only could be true if called
+   *  after onlineZooKeeperServers().
+   *
+   *  @return true if running a ZooKeeper server, false otherwise
+   */
+  public boolean runsZooKeeper() {
+    synchronized (this) {
+      return zkProcess != null;
+    }
+  }
+
+  /**
+   * Log the zookeeper output from the process (if it was started)
+   *
+   * @param level Log level to print at
+   */
+  public void logZooKeeperOutput(Level level) {
+    if (zkProcessCollector != null) {
+      LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
+          StreamCollector.LAST_LINES_COUNT +
+          " lines of the ZooKeeper process STDOUT and STDERR.");
+      zkProcessCollector.dumpLastLines(level);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/zk/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/package-info.java b/giraph-core/src/main/java/org/apache/giraph/zk/package-info.java
new file mode 100644
index 0000000..e1dcf52
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of {@link org.apache.zookeeper.ZooKeeper} related objects.
+ */
+package org.apache.giraph.zk;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/site/resources/css/site.css
----------------------------------------------------------------------
diff --git a/giraph-core/src/site/resources/css/site.css b/giraph-core/src/site/resources/css/site.css
new file mode 100644
index 0000000..bd3e914
--- /dev/null
+++ b/giraph-core/src/site/resources/css/site.css
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#bannerLeft {
+    margin-left:1em;
+    margin-top:1em;
+    float:left;
+    width:auto;
+}
+
+#banner img {
+    float:right;
+    width:100px;
+    margin:0.25em;
+}


Mime
View raw message