incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1186921 [1/2] - in /incubator/hama/trunk: ./ core/conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/bsp/sync/rpc/ core/src/main/java/org/apache/hama/bsp/sync/zooke...
Date Thu, 20 Oct 2011 16:58:19 GMT
Author: tjungblut
Date: Thu Oct 20 16:58:17 2011
New Revision: 1186921

URL: http://svn.apache.org/viewvc?rev=1186921&view=rev
Log:
[HAMA-457]:wq Refactoring of BSPPeerImpl

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/StringArrayWritable.java   (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java   (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java   (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/rpc/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/rpc/TestRPCSynchronization.java   (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/zookeeper/QuorumPeer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaTestCase.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Oct 20 16:58:17 2011
@@ -20,7 +20,7 @@ Release 0.4 - Unreleased
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS
-   
+    HAMA-457: Refactoring of BSPPeerImpl (tjungblut)
     HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)
     HAMA-441: Logging tasks to distinct files (Thomas Jungblut)
     HAMA-423: Improve and Refactor Partitioning in the Examples (Thomas Jungblut via edwardyoon)

Modified: incubator/hama/trunk/core/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/conf/hama-default.xml?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/core/conf/hama-default.xml (original)
+++ incubator/hama/trunk/core/conf/hama-default.xml Thu Oct 20 16:58:17 2011
@@ -99,6 +99,20 @@
     <description>The maximum number of BSP tasks that will be run simultaneously 
     by a groom server.</description>
   </property>
+  
+  <property>
+    <name>hama.sync.server.class</name>
+    <value>org.apache.hama.bsp.sync.zookeeper.ZooKeeperSyncServerImpl</value>
+    <description>The server class which is used for the barrier synchronization. 
+    This has only effects in YARN module.</description>
+  </property>
+  
+  <property>
+    <name>hama.sync.client.class</name>
+    <value>org.apache.hama.bsp.sync.zookeeper.ZooKeeperSyncClientImpl</value>
+    <description>The client class which is used for the 
+    barrier synchronization.</description>
+  </property>
 
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+public class BSPMessageSerializer {
+
+  private static final Log LOG = LogFactory.getLog(BSPMessageSerializer.class);
+
+  final Socket client;
+  final ScheduledExecutorService sched;
+  final Configuration conf;
+
+  public BSPMessageSerializer(final Configuration conf, final int port) {
+    this.conf = conf;
+    Socket tmp = null;
+    int cnt = 0;
+    do {
+      tmp = init(port);
+      cnt++;
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        LOG.warn("Thread is interrupted.", ie);
+        Thread.currentThread().interrupt();
+      }
+    } while (null == tmp && 10 > cnt);
+    this.client = tmp;
+    if (null == this.client)
+      throw new NullPointerException("Client socket is null.");
+    this.sched = Executors.newScheduledThreadPool(conf.getInt(
+        "bsp.checkpoint.serializer_thread", 10));
+    LOG.info(BSPMessageSerializer.class.getName()
+        + " is ready to serialize message.");
+  }
+
+  private Socket init(final int port) {
+    Socket tmp = null;
+    try {
+      tmp = new Socket("localhost", port);
+    } catch (UnknownHostException uhe) {
+      LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+    } catch (IOException ioe) {
+      LOG.warn("Fail to create socket.", ioe);
+    }
+    return tmp;
+  }
+
+  void serialize(final BSPSerializableMessage tmp) throws IOException {
+    if (LOG.isDebugEnabled())
+      LOG.debug("Messages are saved to " + tmp.checkpointedPath());
+    final DataOutput out = new DataOutputStream(client.getOutputStream());
+    this.sched.schedule(new Callable<Object>() {
+      public Object call() throws Exception {
+        tmp.write(out);
+        return null;
+      }
+    }, 0, SECONDS);
+  }
+
+  public void close() {
+    try {
+      this.client.close();
+      this.sched.shutdown();
+    } catch (IOException io) {
+      LOG.error("Fail to close client socket.", io);
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Oct 20 16:58:17 2011
@@ -17,18 +17,16 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * BSP communication interface.
  */
-public interface BSPPeer extends HamaRPCProtocolVersion, Closeable, Constants {
+public interface BSPPeer extends HamaRPCProtocolVersion, Constants {
 
   /**
    * Send a data with a tag to another BSPSlave corresponding to hostname.
@@ -74,10 +72,9 @@ public interface BSPPeer extends HamaRPC
    * Sends all the messages in the outgoing message queues to the corresponding
    * remote peers.
    * 
-   * @throws InterruptedException
-   * @throws KeeperException
+   * @throws InterruptedException 
    */
-  public void sync() throws IOException, KeeperException, InterruptedException;
+  public void sync() throws InterruptedException;
 
   /**
    * @return the count of current super-step

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Oct 20 16:58:17 2011
@@ -17,62 +17,36 @@
  */
 package org.apache.hama.bsp;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.Constants;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * This class represents a BSP peer.
  */
-public class BSPPeerImpl implements Watcher, BSPPeer {
+public class BSPPeerImpl implements BSPPeer {
 
   public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   private final Configuration conf;
-  private BSPJob jobConf;
+  private BSPJob bspJob;
 
   private volatile Server server = null;
-  private ZooKeeper zk = null;
-  private volatile Integer mutex = 0;
-
-  private final String bspRoot;
-  private final String quorumServers;
 
   private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
   private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
@@ -90,114 +64,12 @@ public class BSPPeerImpl implements Watc
 
   private String[] allPeers;
 
-  public static final class BSPSerializableMessage implements Writable {
-    final AtomicReference<String> path = new AtomicReference<String>();
-    final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
-
-    public BSPSerializableMessage() {
-    }
-
-    public BSPSerializableMessage(final String path,
-        final BSPMessageBundle bundle) {
-      if (null == path)
-        throw new NullPointerException("No path provided for checkpointing.");
-      if (null == bundle)
-        throw new NullPointerException("No data provided for checkpointing.");
-      this.path.set(path);
-      this.bundle.set(bundle);
-    }
-
-    public final String checkpointedPath() {
-      return this.path.get();
-    }
-
-    public final BSPMessageBundle messageBundle() {
-      return this.bundle.get();
-    }
-
-    @Override
-    public final void write(DataOutput out) throws IOException {
-      out.writeUTF(this.path.get());
-      this.bundle.get().write(out);
-    }
-
-    @Override
-    public final void readFields(DataInput in) throws IOException {
-      this.path.set(in.readUTF());
-      BSPMessageBundle pack = new BSPMessageBundle();
-      pack.readFields(in);
-      this.bundle.set(pack);
-    }
-
-  }// serializable message
-
-  final class BSPMessageSerializer {
-    final Socket client;
-    final ScheduledExecutorService sched;
-
-    public BSPMessageSerializer(final int port) {
-      Socket tmp = null;
-      int cnt = 0;
-      do {
-        tmp = init(port);
-        cnt++;
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          LOG.warn("Thread is interrupted.", ie);
-          Thread.currentThread().interrupt();
-        }
-      } while (null == tmp && 10 > cnt);
-      this.client = tmp;
-      if (null == this.client)
-        throw new NullPointerException("Client socket is null.");
-      this.sched = Executors.newScheduledThreadPool(conf.getInt(
-          "bsp.checkpoint.serializer_thread", 10));
-      LOG.info(BSPMessageSerializer.class.getName()
-          + " is ready to serialize message.");
-    }
-
-    private Socket init(final int port) {
-      Socket tmp = null;
-      try {
-        tmp = new Socket("localhost", port);
-      } catch (UnknownHostException uhe) {
-        LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
-      } catch (IOException ioe) {
-        LOG.warn("Fail to create socket.", ioe);
-      }
-      return tmp;
-    }
-
-    void serialize(final BSPSerializableMessage tmp) throws IOException {
-      if (LOG.isDebugEnabled())
-        LOG.debug("Messages are saved to " + tmp.checkpointedPath());
-      final DataOutput out = new DataOutputStream(client.getOutputStream());
-      this.sched.schedule(new Callable<Object>() {
-        public Object call() throws Exception {
-          tmp.write(out);
-          return null;
-        }
-      }, 0, SECONDS);
-    }
-
-    public void close() {
-      try {
-        this.client.close();
-        this.sched.shutdown();
-      } catch (IOException io) {
-        LOG.error("Fail to close client socket.", io);
-      }
-    }
-
-  }// message serializer
+  private SyncClient syncClient;
 
   /**
    * Protected default constructor for LocalBSPRunner.
    */
   protected BSPPeerImpl() {
-    bspRoot = null;
-    quorumServers = null;
     messageSerializer = null;
     conf = null;
   }
@@ -210,37 +82,44 @@ public class BSPPeerImpl implements Watc
    * @param conf is the configuration file containing bsp peer host, port, etc.
    * @param umbilical is the bsp protocol used to contact its parent process.
    * @param taskId is the id that current process holds.
+   * @throws Exception
    */
-  public BSPPeerImpl(Configuration conf, TaskAttemptID taskId,
-      BSPPeerProtocol umbilical) throws IOException {
+  public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
+      BSPPeerProtocol umbilical) throws Exception {
     this.conf = conf;
     this.taskId = taskId;
     this.umbilical = umbilical;
+    this.bspJob = job;
 
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
-    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
-        Constants.DEFAULT_ZOOKEEPER_ROOT);
-    quorumServers = QuorumPeer.getZKQuorumServersString(conf);
-    LOG.debug("Quorum  " + quorumServers);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     BSPMessageSerializer msgSerializer = null;
     if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      msgSerializer = new BSPMessageSerializer(conf
-          .getInt("bsp.checkpoint.port", Integer
-              .valueOf(CheckpointRunner.DEFAULT_PORT)));
+      msgSerializer = new BSPMessageSerializer(conf,
+          conf.getInt("bsp.checkpoint.port",
+              Integer.valueOf(CheckpointRunner.DEFAULT_PORT)));
     }
     this.messageSerializer = msgSerializer;
+    initialize();
+    syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
+        peerAddress.getPort());
+    // initial barrier syncing to get all the hosts to the same point, to get
+    // consistent peernames.
+    syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
+    syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
+    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0, TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
+        TaskStatus.Phase.STARTING));
   }
 
-  public void reinitialize() {
+  public void initialize() throws Exception {
     try {
       if (LOG.isDebugEnabled())
         LOG.debug("reinitialize(): " + getPeerName());
-      this.server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
-          .getPort(), conf);
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
       server.start();
       LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
           + peerAddress.getPort());
@@ -248,23 +127,9 @@ public class BSPPeerImpl implements Watc
       LOG.error("Fail to start RPC server!", e);
     }
 
-    try {
-      this.zk = new ZooKeeper(quorumServers, conf.getInt(
-          Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
-    } catch (IOException e) {
-      LOG.error("Fail while reinitializing zookeeeper!", e);
-    }
-
-    try {
-      allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
-          .toArray(new String[0]);
-    } catch (KeeperException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-
-    Arrays.sort(allPeers);
+    syncClient = SyncServiceFactory.getSyncClient(conf);
+    syncClient.init(conf, taskId.getJobID(), taskId);
+    
   }
 
   @Override
@@ -304,7 +169,7 @@ public class BSPPeerImpl implements Watc
 
   private String checkpointedPath() {
     String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
-    String ckptPath = backup + jobConf.getJobID().toString() + "/"
+    String ckptPath = backup + bspJob.getJobID().toString() + "/"
         + getSuperstepCount() + "/" + this.taskId.toString();
     if (LOG.isDebugEnabled())
       LOG.debug("Messages are to be saved to " + ckptPath);
@@ -316,247 +181,63 @@ public class BSPPeerImpl implements Watc
    * @see org.apache.hama.bsp.BSPPeerInterface#sync()
    */
   @Override
-  public void sync() throws IOException, KeeperException, InterruptedException {
-    enterBarrier();
-    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
-        .entrySet().iterator();
-
-    while (it.hasNext()) {
-      Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
-          .next();
-
-      BSPPeer peer = getBSPPeerConnection(entry.getKey());
-      Iterable<BSPMessage> messages = entry.getValue();
-      BSPMessageBundle bundle = new BSPMessageBundle();
-      for (BSPMessage message : messages) {
-        bundle.addMessage(message);
-      }
-
-      // checkpointing
-      if (null != this.messageSerializer) {
-        this.messageSerializer.serialize(new BSPSerializableMessage(
-            checkpointedPath(), bundle));
-      }
-
-      peer.put(bundle);
-    }
-
-    leaveBarrier();
-    currentTaskStatus.incrementSuperstepCount();
-    umbilical.statusUpdate(taskId, currentTaskStatus);
-
-    // Clear outgoing queues.
-    clearOutgoingQueues();
-
-    // Add non-processed messages from this iteration for the next's queue.
-    while (!localQueue.isEmpty()) {
-      BSPMessage message = localQueue.poll();
-      localQueueForNextIteration.add(message);
-    }
-    // Switch local queues.
-    localQueue = localQueueForNextIteration;
-    localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
-  }
-
-  private void createZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.PERSISTENT);
-  }
-
-  private void createEphemeralZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.EPHEMERAL);
-  }
-
-  private void createZnode(final String path, final CreateMode mode)
-      throws KeeperException, InterruptedException {
-    synchronized (zk) {
-      Stat s = zk.exists(path, false);
-      if (null == s) {
-        try {
-          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-        } catch (KeeperException.NodeExistsException nee) {
-          LOG.warn("Ignore because znode may be already created at " + path,
-              nee);
+  public void sync() throws InterruptedException {
+    try {
+      enterBarrier();
+      Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+          .entrySet().iterator();
+
+      while (it.hasNext()) {
+        Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+            .next();
+
+        BSPPeer peer = getBSPPeerConnection(entry.getKey());
+        Iterable<BSPMessage> messages = entry.getValue();
+        BSPMessageBundle bundle = new BSPMessageBundle();
+        for (BSPMessage message : messages) {
+          bundle.addMessage(message);
         }
-      }
-    }
-  }
 
-  private class BarrierWatcher implements Watcher {
-    private boolean complete = false;
-
-    boolean isComplete() {
-      return this.complete;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      this.complete = true;
-      synchronized (mutex) {
-        mutex.notifyAll();
-      }
-    }
-  }
-
-  protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
-          + this.getSuperstepCount());
-    }
-
-    synchronized (zk) {
-      createZnode(bspRoot);
-      final String pathToJobIdZnode = bspRoot + "/"
-          + taskId.getJobID().toString();
-      createZnode(pathToJobIdZnode);
-      final String pathToSuperstepZnode = pathToJobIdZnode + "/"
-          + getSuperstepCount();
-      createZnode(pathToSuperstepZnode);
-      BarrierWatcher barrierWatcher = new BarrierWatcher();
-      Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
-          barrierWatcher);
-      zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-
-      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
-      int size = znodes.size(); // may contains ready
-      boolean hasReady = znodes.contains("ready");
-      if (hasReady) {
-        size--;
-      }
-
-      LOG.debug("===> at superstep :" + getSuperstepCount()
-          + " current znode size: " + znodes.size() + " current znodes:"
-          + znodes);
-
-      if (LOG.isDebugEnabled())
-        LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
-            + " is " + znodes.size() + ". Znodes include " + znodes);
-
-      if (size < jobConf.getNumBspTask()) {
-        LOG.info("1. At superstep: " + getSuperstepCount()
-            + " which task is waiting? " + taskId.toString()
-            + " stat is null? " + readyStat);
-        while (!barrierWatcher.isComplete()) {
-          if (!hasReady) {
-            synchronized (mutex) {
-              mutex.wait(1000);
-            }
-          }
+        // checkpointing
+        if (null != this.messageSerializer) {
+          this.messageSerializer.serialize(new BSPSerializableMessage(
+              checkpointedPath(), bundle));
         }
-        LOG.debug("2. at superstep: " + getSuperstepCount()
-            + " after waiting ..." + taskId.toString());
-      } else {
-        LOG.debug("---> at superstep: " + getSuperstepCount()
-            + " task that is creating /ready znode:" + taskId.toString());
-        createEphemeralZnode(pathToSuperstepZnode + "/ready");
-      }
-    }
-    return true;
-  }
 
-  protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    final String pathToSuperstepZnode = bspRoot + "/"
-        + taskId.getJobID().toString() + "/" + getSuperstepCount();
-    while (true) {
-      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
-      LOG
-          .info("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
-              + getSuperstepCount() + " znode:" + znodes);
-      if (znodes.contains("ready")) {
-        znodes.remove("ready");
+        peer.put(bundle);
       }
-      final int size = znodes.size();
 
-      LOG.debug("leaveBarrier() at superstep:" + getSuperstepCount()
-          + " znode size: (" + size + ") znodes:" + znodes);
-
-      if (null == znodes || znodes.isEmpty())
-        return true;
-      if (1 == size) {
-        try {
-          zk.delete(getNodeName(), 0);
-        } catch (KeeperException.NoNodeException nne) {
-          LOG.warn(
-              "+++ (znode size is 1). Ignore because znode may disconnect.",
-              nne);
-        }
-        return true;
-      }
-      Collections.sort(znodes);
+      leaveBarrier();
+      currentTaskStatus.incrementSuperstepCount();
+      umbilical.statusUpdate(taskId, currentTaskStatus);
 
-      final String lowest = znodes.get(0);
-      final String highest = znodes.get(size - 1);
+      // Clear outgoing queues.
+      clearOutgoingQueues();
 
-      LOG.info("leaveBarrier() at superstep: " + getSuperstepCount()
-          + " taskid:" + taskId.toString() + " lowest: " + lowest + " highest:"
-          + highest);
-      synchronized (mutex) {
-
-        if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
-          Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
-              new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                  synchronized (mutex) {
-                    LOG.debug("leaveBarrier() at superstep: "
-                        + getSuperstepCount() + " taskid:" + taskId.toString()
-                        + " highest notify lowest.");
-                    mutex.notifyAll();
-                  }
-                }
-              });
-
-          if (null != s) {
-            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
-                + " taskid:" + taskId.toString() + " wait for higest notify.");
-            mutex.wait();
-          }
-        } else {
-          Stat s1 = zk.exists(getNodeName(), false);
-
-          if (null != s1) {
-            LOG.info("leaveBarrier() znode at superstep:" + getSuperstepCount()
-                + " taskid:" + taskId.toString() + " exists, so delete it.");
-            try {
-              zk.delete(getNodeName(), 0);
-            } catch (KeeperException.NoNodeException nne) {
-              LOG.warn("++++ Ignore because node may be dleted.", nne);
-            }
-          }
-
-          Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
-              new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                  synchronized (mutex) {
-                    LOG.debug("leaveBarrier() at superstep: "
-                        + getSuperstepCount() + " taskid:" + taskId.toString()
-                        + " lowest notify other nodes.");
-                    mutex.notifyAll();
-                  }
-                }
-              });
-          if (null != s2) {
-            LOG.debug("leaveBarrier(): superstep:" + getSuperstepCount()
-                + " taskid:" + taskId.toString() + " wait for lowest notify.");
-            mutex.wait();
-          }
-        }
+      // Add non-processed messages from this iteration for the next's queue.
+      while (!localQueue.isEmpty()) {
+        BSPMessage message = localQueue.poll();
+        localQueueForNextIteration.add(message);
       }
+      // Switch local queues.
+      localQueue = localQueueForNextIteration;
+      localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+    } catch (Exception e) {
+      LOG.fatal(
+          "Caught exception during superstep "
+              + currentTaskStatus.getSuperstepCount() + "!", e);
+      // throw new RuntimeException(e);
     }
   }
 
-  private String getNodeName() {
-    return bspRoot + "/" + taskId.getJobID().toString() + "/"
-        + getSuperstepCount() + "/" + taskId.toString();
+  protected void enterBarrier() throws Exception {
+    syncClient.enterBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
-  @Override
-  public void process(WatchedEvent event) {
-    synchronized (mutex) {
-      mutex.notify();
-    }
+  protected void leaveBarrier() throws Exception {
+    syncClient.leaveBarrier(taskId.getJobID(), taskId,
+        currentTaskStatus.getSuperstepCount());
   }
 
   public void clear() {
@@ -564,14 +245,9 @@ public class BSPPeerImpl implements Watc
     this.outgoingQueues.clear();
   }
 
-  @Override
-  public void close() throws IOException {
+  public void close() throws Exception {
     this.clear();
-    try {
-      zk.close();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
+    syncClient.close();
     if (server != null)
       server.stop();
     if (null != messageSerializer)
@@ -623,25 +299,34 @@ public class BSPPeerImpl implements Watc
           "Peername must consist of exactly ONE \":\"! Given peername was: "
               + peerName);
     }
-    return new InetSocketAddress(peerAddrParts[0], Integer
-        .valueOf(peerAddrParts[1]));
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.valueOf(peerAddrParts[1]));
   }
 
   @Override
   public String[] getAllPeerNames() {
+    initPeerNames();
     return allPeers;
   }
 
   @Override
   public String getPeerName(int index) {
+    initPeerNames();
     return allPeers[index];
   }
 
   @Override
   public int getNumPeers() {
+    initPeerNames();
     return allPeers.length;
   }
 
+  private void initPeerNames() {
+    if (allPeers == null) {
+      allPeers = syncClient.getAllPeerNames(taskId);
+    }
+  }
+
   /**
    * @return the number of messages
    */
@@ -666,15 +351,6 @@ public class BSPPeerImpl implements Watc
   }
 
   /**
-   * Sets the job configuration
-   * 
-   * @param jobConf
-   */
-  public void setJobConf(BSPJob jobConf) {
-    this.jobConf = jobConf;
-  }
-
-  /**
    * @return the size of local queue
    */
   public int getLocalQueueSize() {

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.io.Writable;
+
+public class BSPSerializableMessage implements Writable {
+  final AtomicReference<String> path = new AtomicReference<String>();
+  final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
+
+  public BSPSerializableMessage() {
+  }
+
+  public BSPSerializableMessage(final String path, final BSPMessageBundle bundle) {
+    if (null == path)
+      throw new NullPointerException("No path provided for checkpointing.");
+    if (null == bundle)
+      throw new NullPointerException("No data provided for checkpointing.");
+    this.path.set(path);
+    this.bundle.set(bundle);
+  }
+
+  public final String checkpointedPath() {
+    return this.path.get();
+  }
+
+  public final BSPMessageBundle messageBundle() {
+    return this.bundle.get();
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    out.writeUTF(this.path.get());
+    this.bundle.get().write(out);
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    this.path.set(in.readUTF());
+    BSPMessageBundle pack = new BSPMessageBundle();
+    pack.readFields(in);
+    this.bundle.set(pack);
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPSerializableMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Oct 20 16:58:17 2011
@@ -49,9 +49,9 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.checkpoint.CheckpointRunner;
@@ -61,12 +61,9 @@ import org.apache.hama.ipc.GroomProtocol
 import org.apache.hama.ipc.MasterProtocol;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.log4j.LogManager;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
 
 /**
  * A Groom Server (shortly referred to as groom) is a process that performs bsp
@@ -153,13 +150,11 @@ public class GroomServer implements Runn
         assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
         int i = 0;
 
-        // add peers to Zookeeper.
         // TODO find another way to manage all activate peers.
         for (GroomServerAction action : actions) {
           Task t = ((LaunchTaskAction) action).getTask();
 
           int peerPort = (Constants.DEFAULT_PEER_PORT + i);
-          registerPeerAddress(t.getJobID(), peerPort);
           assignedPeerNames.put(t.getTaskID(), peerPort);
 
           i++;
@@ -188,24 +183,6 @@ public class GroomServer implements Runn
         }
       }
     }
-
-    /**
-     * Register peer address to share all addresses among tasks.
-     * 
-     * @param jobID
-     * @param peerPort
-     */
-    private void registerPeerAddress(BSPJobID jobID, int peerPort) {
-      try {
-        zk.create(
-            "/" + jobID.toString() + "/" + groomHostName + ":" + peerPort,
-            new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-      } catch (KeeperException e) {
-        e.printStackTrace();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
   }
 
   private class Instructor extends Thread {
@@ -257,14 +234,14 @@ public class GroomServer implements Runn
 
     CheckpointRunner ckptRunner = null;
     if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      ckptRunner = new CheckpointRunner(CheckpointRunner
-          .buildCommands(this.conf));
+      ckptRunner = new CheckpointRunner(
+          CheckpointRunner.buildCommands(this.conf));
     }
     this.checkpointRunner = ckptRunner;
 
     try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
-          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
+          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
     } catch (IOException e) {
       LOG.error("Exception during reinitialization!", e);
     }
@@ -276,8 +253,9 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
-          "default"), conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -904,8 +882,8 @@ public class GroomServer implements Runn
       // int ret = 0;
       if (null != args && 1 == args.length) {
         int port = Integer.parseInt(args[0]);
-        defaultConf.setInt("bsp.checkpoint.port", Integer
-            .parseInt(CheckpointRunner.DEFAULT_PORT));
+        defaultConf.setInt("bsp.checkpoint.port",
+            Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
         if (LOG.isDebugEnabled())
           LOG.debug("Supplied checkpointer port value:" + port);
         Checkpointer ckpt = new Checkpointer(defaultConf);
@@ -952,13 +930,8 @@ public class GroomServer implements Runn
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
 
-      BSPPeerImpl bspPeer = new BSPPeerImpl(defaultConf, taskid, umbilical);
-      bspPeer.reinitialize();
-      bspPeer.setJobConf(job);
-
-      bspPeer.setCurrentTaskStatus(new TaskStatus(task.getJobID(), task
-          .getTaskID(), 0, TaskStatus.State.RUNNING, "running", host,
-          TaskStatus.Phase.STARTING));
+      // instantiate and init our peer
+      BSPPeerImpl bspPeer = new BSPPeerImpl(job, defaultConf, taskid, umbilical);
 
       try {
         // use job-specified working directory

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1186921&r1=1186920&r2=1186921&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Oct 20 16:58:17 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMaster.State;
 import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * A multithreaded local BSP runner that can be used for debugging and local
@@ -59,6 +58,7 @@ public class LocalBSPRunner implements J
   protected static CyclicBarrier barrier;
 
   protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
+  protected String[] allPeers;
   protected String jobFile;
   protected String jobName;
 
@@ -84,6 +84,9 @@ public class LocalBSPRunner implements J
       String name = IDENTIFIER + " " + i;
       localGrooms.put(name, new LocalGroom(name));
     }
+    
+    allPeers = localGrooms.keySet().toArray(
+        new String[localGrooms.keySet().size()]);
 
   }
 
@@ -200,6 +203,8 @@ public class LocalBSPRunner implements J
       this.groom = groom;
     }
 
+    // deprecated until 0.5.0, then it will be removed.
+    @SuppressWarnings("deprecation")
     public void run() {
       bsp.setConf(conf);
       try {
@@ -253,7 +258,7 @@ public class LocalBSPRunner implements J
 
   }
 
-  class LocalGroom extends BSPPeerImpl {
+  final class LocalGroom implements BSPPeer {
     private long superStepCount = 0;
     private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
     // outgoing queue
@@ -293,8 +298,7 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public void sync() throws IOException, KeeperException,
-        InterruptedException {
+    public void sync() throws InterruptedException {
       // wait until all threads reach this barrier
       barrierSync();
       // send the messages
@@ -302,7 +306,11 @@ public class LocalBSPRunner implements J
           .entrySet()) {
         String peerName = entry.getKey();
         for (BSPMessage msg : entry.getValue())
-          localGrooms.get(peerName).put(msg);
+          try {
+            localGrooms.get(peerName).put(msg);
+          } catch (IOException e) {
+            LOG.error("Putting message \"" + msg.toString() + "\" failed! ", e);
+          }
       }
       // clear the local outgoing queue
       outgoingQueues.clear();
@@ -336,8 +344,7 @@ public class LocalBSPRunner implements J
 
     @Override
     public String[] getAllPeerNames() {
-      return localGrooms.keySet().toArray(
-          new String[localGrooms.keySet().size()]);
+      return allPeers;
     }
 
     @Override
@@ -351,10 +358,6 @@ public class LocalBSPRunner implements J
       return 3;
     }
 
-    @Override
-    public void close() throws IOException {
-
-    }
 
     @Override
     public void put(BSPMessageBundle messages) throws IOException {
@@ -365,5 +368,15 @@ public class LocalBSPRunner implements J
       return conf;
     }
 
+    @Override
+    public String getPeerName(int index) {
+      return allPeers[index];
+    }
+
+    @Override
+    public int getNumPeers() {
+      return allPeers.length;
+    }
+
   }
 }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Basic interface for a client that connects to a sync server.
+ * 
+ */
+public interface SyncClient {
+
+  /**
+   * Init will be called within a spawned task, it should be used to initialize
+   * the inner structure and fields, e.G. a zookeeper client or an rpc
+   * connection to the real sync daemon.
+   * 
+   * @throws Exception
+   */
+  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception;
+
+  /**
+   * Enters the barrier before the message sending in each superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws Exception
+   */
+  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws Exception;
+
+  /**
+   * Leaves the barrier after all communication has been done, this is usually
+   * the end of a superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   */
+  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws Exception;
+
+  /**
+   * Registers a specific task with a its host and port to the sync daemon.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param hostAddress the host where the sync server resides
+   * @param port the port where the sync server is up
+   */
+  public void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * Returns all registered tasks within the sync daemon. They have to be
+   * ordered descending.
+   * 
+   * @param taskId the tasks ID
+   * @return an <b>ordered</b> string array of host:port pairs of all tasks
+   *         connected to the daemon.
+   */
+  public String[] getAllPeerNames(TaskAttemptID taskId);
+
+  /**
+   * TODO this has currently no use. Could later be used to deregister tasks
+   * from the barrier during runtime if they are finished. Something equal to
+   * voteToHalt() in Pregel.
+   * 
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * This stops the sync daemon. Only used in YARN.
+   */
+  public void stopServer();
+
+  /**
+   * This method should close all used resources, e.G. a ZooKeeper instance.
+   * 
+   * @throws Exception
+   */
+  public void close() throws Exception;
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Basic interface for a barrier synchronization services. This interface is not
+ * used by HAMA, because every syncserver has to run as a seperate daemon. In
+ * YARN this is used to launch the sync server as part of the ApplicationMaster
+ * in a separate thread.
+ * 
+ */
+public interface SyncServer {
+
+  /**
+   * In YARN port and hostname of the sync server is only known at runtime, so
+   * this method should modify the conf to set the host:port of the syncserver
+   * that is going to start and return it.
+   * 
+   * @param conf
+   * @return
+   */
+  public Configuration init(Configuration conf) throws Exception;
+
+  /**
+   * Starts the server. This method can possibly block the call.
+   */
+  public void start() throws Exception;
+
+  /**
+   * Stops the server.
+   */
+  public void stopServer();
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Thread runner for a sync server.
+ * 
+ */
+public class SyncServerRunner implements Callable<Object> {
+
+  private SyncServer syncServer;
+
+  // use the SyncServiceFactory to obtain a new instance.
+  SyncServerRunner(Configuration conf) {
+    syncServer = SyncServiceFactory.getSyncServer(conf);
+  }
+
+  public Configuration init(Configuration conf) throws Exception {
+    return syncServer.init(conf);
+  }
+
+  @Override
+  public Object call() throws Exception {
+    syncServer.start();
+    return null;
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    syncServer.stopServer();
+  }
+
+}
\ No newline at end of file

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServerRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.sync.zookeeper.ZooKeeperSyncClientImpl;
+import org.apache.hama.bsp.sync.zookeeper.ZooKeeperSyncServerImpl;
+
+public class SyncServiceFactory {
+
+  private static final Log LOG = LogFactory.getLog(SyncServiceFactory.class);
+
+  public static final String SYNC_SERVER_CLASS = "hama.sync.server.class";
+  public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class";
+
+  /**
+   * Returns a sync client via reflection based on what was configured.
+   * 
+   * @param conf
+   * @return
+   */
+  public static SyncClient getSyncClient(Configuration conf) {
+    if (conf.get(SYNC_CLIENT_CLASS) != null) {
+      try {
+        return (SyncClient) ReflectionUtils.newInstance(
+            conf.getClassByName(conf.get(SYNC_CLIENT_CLASS)), conf);
+      } catch (ClassNotFoundException e) {
+        LOG.error(
+            "Class for sync client has not been found, returning default zookeeper client!",
+            e);
+      }
+    } else {
+      LOG.info("No property set for \"hama.sync.client.class\", using default zookeeper client");
+    }
+    return ReflectionUtils.newInstance(ZooKeeperSyncClientImpl.class, conf);
+  }
+
+  /**
+   * Returns a sync server via reflection based on what was configured.
+   * 
+   * @param conf
+   * @return
+   */
+  public static SyncServer getSyncServer(Configuration conf) {
+    if (conf.get(SYNC_SERVER_CLASS) != null) {
+      try {
+        return (SyncServer) ReflectionUtils.newInstance(
+            conf.getClassByName(conf.get(SYNC_SERVER_CLASS)), conf);
+      } catch (ClassNotFoundException e) {
+        LOG.error(
+            "Class for sync server has not been found, returning default zookeeper server!",
+            e);
+      }
+    } else {
+      LOG.info("No property set for \"hama.sync.server.class\", using default zookeeper client");
+    }
+    return ReflectionUtils.newInstance(ZooKeeperSyncServerImpl.class, conf);
+  }
+
+  /**
+   * Returns a sync server runner via reflection based on what was configured.
+   * 
+   * @param conf
+   * @return
+   */
+  public static SyncServerRunner getSyncServerRunner(Configuration conf) {
+    return new SyncServerRunner(conf);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,65 @@
+package org.apache.hama.bsp.sync.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.sync.SyncClient;
+
+/**
+ * Client side implementation of an example RPC sync client. Basically it just
+ * proxies all the calls to the RPC server and returns the result.
+ */
+public class RPCSyncClientImpl implements SyncClient {
+
+  private RPCSyncServer syncService;
+
+  @Override
+  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception {
+    syncService = RPCSyncServerImpl.getService(conf);
+  }
+
+  @Override
+  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws Exception {
+    syncService.enterBarrier(taskId);
+  }
+
+  @Override
+  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws Exception {
+    syncService.leaveBarrier(taskId);
+  }
+
+  @Override
+  public void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port) {
+    syncService.register(taskId, new Text(hostAddress), new LongWritable(port));
+  }
+
+  @Override
+  public String[] getAllPeerNames(TaskAttemptID taskId) {
+    // our sync service ensures the order of the peers
+    return syncService.getAllPeerNames().get();
+  }
+
+  @Override
+  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port) {
+    syncService.deregisterFromBarrier(taskId, new Text(hostAddress),
+        new LongWritable(port));
+  }
+
+  @Override
+  public void stopServer() {
+    syncService.stopServer();
+  }
+
+  @Override
+  public void close() throws Exception {
+    
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncClientImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync.rpc;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.StringArrayWritable;
+
+/**
+ * Hadoop RPC based barrier synchronization service.
+ * 
+ */
+public interface RPCSyncServer extends VersionedProtocol {
+
+  public static final long versionID = 0L;
+
+  public void enterBarrier(TaskAttemptID id);
+
+  public void leaveBarrier(TaskAttemptID id);
+
+  public void register(TaskAttemptID id, Text hostAddress, LongWritable port);
+
+  public LongWritable getSuperStep();
+
+  public StringArrayWritable getAllPeerNames();
+
+  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+      LongWritable port);
+  
+  public void stopServer();
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync.rpc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.StringArrayWritable;
+
+/**
+ * Example Synchronization Deamon with Hadoops RPC. <br\>
+ */
+public class RPCSyncServerImpl implements SyncServer, RPCSyncServer {
+
+  private static final Log LOG = LogFactory.getLog(RPCSyncServerImpl.class);
+
+  private Configuration conf = new Configuration();
+  private Server server;
+
+  private int parties;
+
+  private CyclicBarrier barrier;
+  private CyclicBarrier leaveBarrier;
+  private Set<Integer> partySet;
+  private Set<String> peerAddresses;
+
+  private volatile long superstep = 0L;
+
+  // default constructor to be instantiated via reflection
+  public RPCSyncServerImpl() {
+  }
+
+  // used by the main method if someone decides to launch it as seperate
+  // service.
+  RPCSyncServerImpl(int parties, String host, int port) throws IOException {
+    initInternal(parties, host, port);
+  }
+
+  @Override
+  public Configuration init(Configuration conf) throws Exception {
+    String host = BSPNetUtils.getCanonicalHostname();
+    int port = BSPNetUtils.getFreePort();
+
+    this.parties = conf.getInt("bsp.peers.num", 1);
+    initInternal(parties, host, port);
+
+    conf.set("hama.sync.server.address", host + ":" + port);
+    return conf;
+  }
+
+  private void initInternal(int parties, String host, int port)
+      throws IOException {
+    this.parties = parties;
+    this.barrier = new CyclicBarrier(parties);
+    this.leaveBarrier = new CyclicBarrier(parties, new SuperStepIncrementor(
+        this));
+
+    this.partySet = Collections.synchronizedSet(new HashSet<Integer>(parties));
+    // tree set so there is ascending order for consistent returns in
+    // getAllPeerNames()
+    this.peerAddresses = Collections.synchronizedSet(new TreeSet<String>());
+    // allocate ten more rpc handler than parties for additional services to
+    // plug in or to deal with failure.
+    this.server = RPC.getServer(this, host, port, parties + 10, false, conf);
+    LOG.info("Sync Server is now up at: " + host + ":" + port + "!");
+  }
+
+  public void start() throws IOException {
+    server.start();
+  }
+
+  @Override
+  public void stopServer() {
+    server.stop();
+  }
+
+  public void join() throws InterruptedException {
+    server.join();
+  }
+
+  public static RPCSyncServer getService(Configuration conf)
+      throws NumberFormatException, IOException {
+    String syncAddress = conf.get("hama.sync.server.address");
+    if (syncAddress == null || syncAddress.isEmpty()
+        || !syncAddress.contains(":")) {
+      throw new IllegalArgumentException(
+          "Server sync address must contain a colon and must be non-empty and not-null! Property \"hama.sync.server.address\" was: "
+              + syncAddress);
+    }
+    String[] hostPort = syncAddress.split(":");
+    return (RPCSyncServer) RPC.waitForProxy(RPCSyncServer.class, versionID,
+        new InetSocketAddress(hostPort[0], Integer.valueOf(hostPort[1])), conf);
+
+  }
+
+  @Override
+  public void enterBarrier(TaskAttemptID id) {
+    LOG.info("Task: " + id.getId() + " entered Barrier!");
+    if (partySet.contains(id.getId())) {
+      try {
+        barrier.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.warn("TaskID " + id + " is no verified task!");
+    }
+  }
+
+  @Override
+  public void leaveBarrier(TaskAttemptID id) {
+    LOG.info("Task: " + id.getId() + " leaves Barrier!");
+    if (partySet.contains(id.getId())) {
+      try {
+        leaveBarrier.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.warn("TaskID " + id + " is no verified task!");
+    }
+  }
+
+  @Override
+  public synchronized void register(TaskAttemptID id, Text hostAddress,
+      LongWritable port) {
+    partySet.add(id.getId());
+    String peer = hostAddress.toString() + ":" + port.get();
+    peerAddresses.add(peer);
+    LOG.info("Registered: " + id.getId() + " for peer " + peer);
+    if (partySet.size() > parties) {
+      LOG.warn("Registered more tasks than configured!");
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return clientVersion;
+  }
+
+  private static class SuperStepIncrementor implements Runnable {
+
+    private final RPCSyncServerImpl instance;
+
+    public SuperStepIncrementor(RPCSyncServerImpl syncServer) {
+      this.instance = syncServer;
+    }
+
+    @Override
+    public void run() {
+      synchronized (instance) {
+        this.instance.superstep += 1L;
+        LOG.info("Entering superstep: " + this.instance.superstep);
+      }
+    }
+
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException {
+    LOG.info(Arrays.toString(args));
+    if (args.length == 3) {
+      RPCSyncServerImpl syncServer = new RPCSyncServerImpl(
+          Integer.valueOf(args[0]), args[1], Integer.valueOf(args[2]));
+      syncServer.start();
+      syncServer.join();
+    } else {
+      throw new IllegalArgumentException(
+          "Argument count does not match 3! Given size was " + args.length
+              + " and parameters were " + Arrays.toString(args));
+    }
+  }
+
+  @Override
+  public synchronized LongWritable getSuperStep() {
+    return new LongWritable(superstep);
+  }
+
+  @Override
+  public synchronized StringArrayWritable getAllPeerNames() {
+    return new StringArrayWritable(
+        peerAddresses.toArray(new String[peerAddresses.size()]));
+  }
+
+  @Override
+  public void deregisterFromBarrier(TaskAttemptID id, Text hostAddress,
+      LongWritable port) {
+    // TODO Auto-generated method stub
+    // basically has to recreate the barriers and remove from the two basic
+    // sets.
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/rpc/RPCSyncServerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This client class abstracts the use of our zookeeper sync code. <br/>
+ * <br/>
+ * TODO maybe extract an abstract class and let the subclasses implement
+ * enter-/leaveBarrier so we can have multiple implementations, just like
+ * goldenorb.
+ * 
+ */
+public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+
+  public static final Log LOG = LogFactory
+      .getLog(ZooKeeperSyncClientImpl.class);
+
+  private volatile Integer mutex = 0;
+
+  private String quorumServers;
+  private ZooKeeper zk;
+  private String bspRoot;
+  private InetSocketAddress peerAddress;
+  private int numBSPTasks;
+  // allPeers is lazily initialized
+  private String[] allPeers;
+
+  @Override
+  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception {
+    quorumServers = QuorumPeer.getZKQuorumServersString(conf);
+    this.zk = new ZooKeeper(quorumServers, conf.getInt(
+        Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+    String bindAddress = conf.get(Constants.PEER_HOST,
+        Constants.DEFAULT_PEER_HOST);
+    int bindPort = conf
+        .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+    peerAddress = new InetSocketAddress(bindAddress, bindPort);
+    numBSPTasks = conf.getInt("bsp.peers.num", 1);
+  }
+
+  @Override
+  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws Exception {
+    LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + superstep);
+
+    synchronized (zk) {
+      createZnode(bspRoot);
+      final String pathToJobIdZnode = bspRoot + "/"
+          + taskId.getJobID().toString();
+      createZnode(pathToJobIdZnode);
+      final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
+      createZnode(pathToSuperstepZnode);
+      BarrierWatcher barrierWatcher = new BarrierWatcher();
+      Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
+          barrierWatcher);
+      zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL);
+
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      int size = znodes.size(); // may contains ready
+      boolean hasReady = znodes.contains("ready");
+      if (hasReady) {
+        size--;
+      }
+
+      LOG.debug("===> at superstep :" + superstep + " current znode size: "
+          + znodes.size() + " current znodes:" + znodes);
+
+      LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+          + " is " + znodes.size() + ". Znodes include " + znodes);
+
+      if (size < numBSPTasks) {
+        LOG.info("1. At superstep: " + superstep + " which task is waiting? "
+            + taskId.toString() + " stat is null? " + readyStat);
+        while (!barrierWatcher.isComplete()) {
+          if (!hasReady) {
+            synchronized (mutex) {
+              mutex.wait(1000);
+            }
+          }
+        }
+        LOG.debug("2. at superstep: " + superstep + " after waiting ..."
+            + taskId.toString());
+      } else {
+        LOG.debug("---> at superstep: " + superstep
+            + " task that is creating /ready znode:" + taskId.toString());
+        createEphemeralZnode(pathToSuperstepZnode + "/ready");
+      }
+    }
+  }
+
+  @Override
+  public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
+      final long superstep) throws Exception {
+    final String pathToSuperstepZnode = bspRoot + "/"
+        + taskId.getJobID().toString() + "/" + superstep;
+    while (true) {
+      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+      LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
+          + superstep + " znode:" + znodes);
+      if (znodes.contains("ready")) {
+        znodes.remove("ready");
+      }
+      final int size = znodes.size();
+
+      LOG.debug("leaveBarrier() at superstep:" + superstep + " znode size: ("
+          + size + ") znodes:" + znodes);
+
+      if (null == znodes || znodes.isEmpty())
+        return;
+      if (1 == size) {
+        try {
+          zk.delete(getNodeName(taskId, superstep), 0);
+        } catch (KeeperException.NoNodeException nne) {
+          LOG.warn(
+              "+++ (znode size is 1). Ignore because znode may disconnect.",
+              nne);
+        }
+        return;
+      }
+      Collections.sort(znodes);
+
+      final String lowest = znodes.get(0);
+      final String highest = znodes.get(size - 1);
+
+      LOG.info("leaveBarrier() at superstep: " + superstep + " taskid:"
+          + taskId.toString() + " lowest: " + lowest + " highest:" + highest);
+      synchronized (mutex) {
+
+        if (getNodeName(taskId, superstep).equals(
+            pathToSuperstepZnode + "/" + lowest)) {
+          Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: " + superstep
+                        + " taskid:" + taskId.toString()
+                        + " highest notify lowest.");
+                    mutex.notifyAll();
+                  }
+                }
+              });
+
+          if (null != s) {
+            LOG.debug("leaveBarrier(): superstep:" + superstep + " taskid:"
+                + taskId.toString() + " wait for higest notify.");
+            mutex.wait();
+          }
+        } else {
+          Stat s1 = zk.exists(getNodeName(taskId, superstep), false);
+
+          if (null != s1) {
+            LOG.info("leaveBarrier() znode at superstep:" + superstep
+                + " taskid:" + taskId.toString() + " exists, so delete it.");
+            try {
+              zk.delete(getNodeName(taskId, superstep), 0);
+            } catch (KeeperException.NoNodeException nne) {
+              LOG.warn("++++ Ignore because node may be dleted.", nne);
+            }
+          }
+
+          Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    LOG.debug("leaveBarrier() at superstep: " + superstep
+                        + " taskid:" + taskId.toString()
+                        + " lowest notify other nodes.");
+                    mutex.notifyAll();
+                  }
+                }
+              });
+          if (null != s2) {
+            LOG.debug("leaveBarrier(): superstep:" + superstep + " taskid:"
+                + taskId.toString() + " wait for lowest notify.");
+            mutex.wait();
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port) {
+    registerTask(zk, jobId, hostAddress, port);
+  }
+
+  /**
+   * Registers the task from outside, most of the time used by the groom which
+   * uses this at task spawn-time.
+   * 
+   * @param zk
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public static void registerTask(ZooKeeper zk, BSPJobID jobId,
+      String hostAddress, long port) {
+    try {
+      zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
+          new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public String[] getAllPeerNames(TaskAttemptID taskId) {
+    if (allPeers == null) {
+      try {
+        allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
+            .toArray(new String[0]);
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new NullPointerException("All peer names could not be retrieved!");
+      }
+      // don't forget to sort the peers, since zookeeper does not care about
+      // ordering the children.
+      Arrays.sort(allPeers);
+    }
+    return allPeers;
+  }
+
+  @Override
+  public void close() throws Exception {
+    zk.close();
+  }
+
+  @Override
+  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port) {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void stopServer() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    synchronized (mutex) {
+      mutex.notify();
+    }
+  }
+
+  /*
+   * UTILITY METHODS
+   */
+
+  /**
+   * @return the string as host:port of this Peer
+   */
+  public String getPeerName() {
+    return peerAddress.getHostName() + ":" + peerAddress.getPort();
+  }
+
+  private String getNodeName(TaskAttemptID taskId, long superstep) {
+    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+        + taskId.toString();
+  }
+
+  private void createZnode(final String path) throws KeeperException,
+      InterruptedException {
+    createZnode(path, CreateMode.PERSISTENT);
+  }
+
+  private void createEphemeralZnode(final String path) throws KeeperException,
+      InterruptedException {
+    createZnode(path, CreateMode.EPHEMERAL);
+  }
+
+  private void createZnode(final String path, final CreateMode mode)
+      throws KeeperException, InterruptedException {
+    synchronized (zk) {
+      Stat s = zk.exists(path, false);
+      if (null == s) {
+        try {
+          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+        } catch (KeeperException.NodeExistsException nee) {
+          LOG.warn("Ignore because znode may be already created at " + path,
+              nee);
+        }
+      }
+    }
+  }
+
+  /*
+   * INNER CLASSES
+   */
+
+  private class BarrierWatcher implements Watcher {
+    private boolean complete = false;
+
+    boolean isComplete() {
+      return this.complete;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      this.complete = true;
+      synchronized (mutex) {
+        mutex.notifyAll();
+      }
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncClientImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java?rev=1186921&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java Thu Oct 20 16:58:17 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync.zookeeper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.zookeeper.QuorumPeer;
+
+/**
+ * 
+ * Launches a new ZooKeeper. Basically just used by YARN.
+ * 
+ */
+public class ZooKeeperSyncServerImpl implements SyncServer {
+
+  private Configuration conf;
+
+  @Override
+  public Configuration init(Configuration conf) throws Exception {
+    this.conf = conf;
+    int port = BSPNetUtils.getFreePort(15600);
+
+    String portString = conf.get("hama.zookeeper.property.clientPort");
+    if (portString != null) {
+      port = Integer.parseInt(portString);
+    }
+
+    String host = BSPNetUtils.getCanonicalHostname();
+    String hostConfigured = conf.get("hama.zookeeper.quorum");
+    if (hostConfigured != null) {
+      // since someone can set multiple quorums via comma separated string, we
+      // split on comma and just use the first one. The client itself just uses
+      // the "hama.zookeeper.quorum" property.
+      host = hostConfigured.split(",")[0];
+    }
+
+    conf.set("hama.sync.server.address", host + ":" + port);
+
+    return conf;
+  }
+
+  @Override
+  public void start() throws Exception {
+    QuorumPeer.runZooKeeper(conf);
+  }
+
+  @Override
+  public void stopServer() {
+    // TODO this could be done somehow
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/zookeeper/ZooKeeperSyncServerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message