incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r824269 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/
Date Mon, 12 Oct 2009 09:08:09 GMT
Author: edwardyoon
Date: Mon Oct 12 09:08:09 2009
New Revision: 824269

URL: http://svn.apache.org/viewvc?rev=824269&view=rev
Log:
Interface of the bsp

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPConstants.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/DefaultBSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/package.html
    incubator/hama/trunk/src/test/org/apache/hama/bsp/
    incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
Modified:
    incubator/hama/trunk/CHANGES.txt

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=824269&r1=824268&r2=824269&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Oct 12 09:08:09 2009
@@ -4,6 +4,7 @@
 
   NEW FEATURES
   
+    HAMA-195: Interface of the bsp (hyunsik via edwardyoon)
     HAMA-185: Finds the eigenvalues and eigenvectors 
               associated with the symmetric matrix A (samuel)
     HAMA-171: Find the maximum absolute row sum using MapReduce (edwardyoon)

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPConstants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPConstants.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPConstants.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPConstants.java Mon Oct 12 09:08:09
2009
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+public interface BSPConstants {
+
+  /** default host address */
+  static final String PEER_HOST = "bsp.peer.hostname";
+  /** default host address */
+  static final String DEFAULT_PEER_HOST = "0.0.0.0";
+
+  static final String PEER_PORT = "bsp.peer.port";
+  /** Default port region server listens on. */
+  static final int DEFAULT_PEER_PORT = 61000;
+
+  static final long ATLEAST_WAIT_TIME = 100;
+
+  /** zookeeper root */
+  static final String ZOOKEEPER_ROOT = "bsp.zookeeper.root";
+  /** zookeeper default root */
+  static final String DEFAULT_ZOOKEEPER_ROOT = "/bsp";
+
+  /** zookeeper server address */
+  static final String ZOOKEEPER_SERVER_ADDRS = "zookeeper.server";
+  /** Parameter name for number of times to retry writes to ZooKeeper. */
+  static final String ZOOKEEPER_RETRIES = "zookeeper.retries";
+  /** Default number of times to retry writes to ZooKeeper. */
+  static final int DEFAULT_ZOOKEEPER_RETRIES = 5;
+  /** Parameter name for ZooKeeper pause between retries. In milliseconds. */
+  static final String ZOOKEEPER_PAUSE = "zookeeper.pause";
+  /** Default ZooKeeper pause value. In milliseconds. */
+  static final int DEFAULT_ZOOKEEPER_PAUSE = 2 * 1000;
+
+  /**
+   * An empty instance.
+   */
+  static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Mon Oct 12 09:08:09
2009
@@ -0,0 +1,67 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class BSPMessage implements Writable {
+  protected byte[] tag;
+  protected byte[] data;
+
+  public BSPMessage() {
+  }
+
+  public BSPMessage(byte[] tag, byte[] data) {
+    this.tag = new byte[tag.length];
+    this.data = new byte[data.length];
+    System.arraycopy(tag, 0, this.tag, 0, tag.length);
+    System.arraycopy(data, 0, this.data, 0, data.length);
+  }
+
+  public byte[] getTag() {
+    byte[] result = this.tag;
+    return result;
+  }
+
+  public byte[] getData() {
+    byte[] result = this.data;
+    return result;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.tag = new byte[in.readInt()];
+    in.readFully(tag, 0, this.tag.length);
+    this.data = new byte[in.readInt()];
+    in.readFully(data, 0, this.data.length);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag.length);
+    out.write(tag);
+    out.writeInt(data.length);
+    out.write(data);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Mon Oct 12 09:08:09 2009
@@ -0,0 +1,236 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+public class BSPPeer implements DefaultBSPPeer, Watcher, BSPPeerInterface {
+  public static final Log LOG = LogFactory.getLog(BSPPeer.class);
+
+  protected Configuration conf;
+
+  protected InetSocketAddress masterAddr = null;
+  protected Server server = null;
+  protected ZooKeeper zk = null;
+  protected Integer mutex = 0;
+
+  protected final String serverName;
+  protected final String bindAddress;
+  protected final int bindPort;
+  protected final String bspRoot;
+
+  protected final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress,
BSPPeerInterface>();
+  protected final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  protected final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+
+  /**
+   * 
+   */
+  public BSPPeer(Configuration conf) throws IOException {
+    this.conf = conf;
+
+    serverName = conf.get(PEER_HOST) + conf.get(PEER_PORT, DEFAULT_PEER_HOST);
+    bindAddress = conf.get(PEER_HOST, DEFAULT_PEER_HOST);
+    bindPort = Integer.valueOf(conf.get(PEER_PORT, DEFAULT_PEER_HOST));
+    bspRoot = conf.get(ZOOKEEPER_ROOT, DEFAULT_ZOOKEEPER_ROOT);
+
+    reinitialize();
+  }
+
+  public void reinitialize() {
+    try {
+      server = RPC.getServer(this, bindAddress, bindPort, conf);
+      server.start();
+    } catch (IOException e1) {
+      e1.printStackTrace();
+    }
+
+    try {
+      zk = new ZooKeeper(conf.get(ZOOKEEPER_SERVER_ADDRS), 3000, this);
+    } catch (IOException e1) {
+      e1.printStackTrace();
+    }
+  }
+
+  @Override
+  public BSPMessage getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
+   *      org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
+   */
+  @Override
+  public void send(InetSocketAddress hostname, BSPMessage msg)
+      throws IOException {
+
+    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(hostname);
+    if (queue == null) {
+      queue = new ConcurrentLinkedQueue<BSPMessage>();
+      outgoingQueues.put(hostname, queue);
+    }
+    queue.add(msg);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hama.bsp.BSPPeerInterface#sync()
+   */
+  @Override
+  public void sync() throws IOException, KeeperException, InterruptedException {
+    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>>
it = this.outgoingQueues
+        .entrySet().iterator();
+    Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry;
+    ConcurrentLinkedQueue<BSPMessage> queue;
+    BSPPeerInterface peer;
+
+    Iterator<BSPMessage> messages;
+
+    while (it.hasNext()) {
+      entry = it.next();
+
+      peer = peers.get(entry.getKey());
+      if (peer == null) {
+        peer = getBSPPeerConnection(entry.getKey());
+      }
+      queue = entry.getValue();
+      messages = queue.iterator();
+
+      while (messages.hasNext()) {
+        peer.put(messages.next());
+      }
+    }
+    enterBarrier();
+    Thread.sleep(ATLEAST_WAIT_TIME); // TODO - This is temporary work because
+    // it can be affected by network condition,
+    // the number of peers, and the load of zookeeper.
+    // It should fixed to some flawless way.
+    leaveBarrier();
+
+  }
+
+  protected boolean enterBarrier() throws KeeperException, InterruptedException {
+    LOG.debug("[" + serverName + "] enter the enterbarrier");
+    try {
+      zk.create(bspRoot + "/" + serverName, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL);
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    while (true) {
+      synchronized (mutex) {
+        List<String> list = zk.getChildren(bspRoot, true);
+        if (list.size() < Integer.valueOf(conf.get("bsp.peers.num"))) {
+          mutex.wait();
+        } else {
+          return true;
+        }
+      }
+    }
+  }
+
+  protected boolean leaveBarrier() throws KeeperException, InterruptedException {
+    zk.delete(bspRoot + "/" + serverName, 0);
+
+    while (true) {
+      synchronized (mutex) {
+        List<String> list = zk.getChildren(bspRoot, true);
+        if (list.size() > 0) {
+          mutex.wait();
+        } else {
+          LOG.debug("[" + serverName + "] leave from the leaveBarrier");
+          return true;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    synchronized (mutex) {
+      mutex.notify();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public boolean isRunning() {
+    return true;
+  }
+
+  @Override
+  public void put(BSPMessage msg) throws IOException {
+    this.localQueue.add(msg);
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return BSPPeerInterface.versionID;
+  }
+
+  protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr) {
+    BSPPeerInterface peer;
+    synchronized (this.peers) {
+      peer = peers.get(addr);
+
+      if (peer == null) {
+        try {
+          peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class,
+              BSPPeerInterface.versionID, addr, this.conf);
+        } catch (IOException e) {
+
+        }
+        this.peers.put(addr, peer);
+      }
+    }
+
+    return peer;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Mon Oct 12 09:08:09
2009
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+public interface BSPPeerInterface extends BSPRPCProtocolVersion {
+  public void put(BSPMessage msg) throws IOException;
+
+  public boolean isRunning();
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPRPCProtocolVersion.java Mon Oct 12
09:08:09 2009
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * 
+ */
+public interface BSPRPCProtocolVersion extends VersionedProtocol {
+
+  /**
+   * Interface Version History
+   * 
+   * 0 - Alpha Version
+   */
+  public static final long versionID = 0L;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DefaultBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DefaultBSPPeer.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DefaultBSPPeer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DefaultBSPPeer.java Mon Oct 12 09:08:09
2009
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * 
+ */
+public interface DefaultBSPPeer extends Closeable, BSPConstants {
+
+  /**
+   * Send a data with a tag to another BSPSlave corresponding to hostname.
+   * Messages sent by this method are not guaranteed to be received in a sent
+   * order.
+   * 
+   * @param hostname
+   * @param msg
+   * @throws IOException
+   */
+  public void send(InetSocketAddress hostname, BSPMessage msg)
+      throws IOException;
+
+  /**
+   * @return the current message
+   * @throws IOException
+   */
+  public BSPMessage getCurrentMessage() throws IOException;
+
+  /**
+   * Synchronize all of the data in the local queue to other BSP Peers.
+   * 
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public void sync() throws IOException, KeeperException, InterruptedException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/package.html?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/package.html (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/package.html Mon Oct 12 09:08:09 2009
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+BSP package of Hama is a framework for developing BSP-style applications
+</body>
+</html>

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java?rev=824269&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java Mon Oct 12 09:08:09
2009
@@ -0,0 +1,173 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.HamaCluster;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class BSPPeerTest extends HamaCluster implements Watcher {
+  private Log LOG = LogFactory.getLog(BSPPeerTest.class);
+
+  private static final int NUM_PEER = 35;
+  private static final int ROUND = 10;
+  private static final int PAYLOAD = 1024; // 1kb in default
+  List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
+  Configuration conf;
+
+  public BSPPeerTest() {
+    this.conf = getConf();
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+
+    ZooKeeper zk = new ZooKeeper("localhost:21810", 3000, this);
+    Stat s = null;
+    if (zk != null) {
+      try {
+        s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+      } catch (Exception e) {
+        LOG.error(s);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+    }
+  }
+
+  public class BSPPeerThread extends Thread {
+    private BSPPeer peer;
+    private int MAXIMUM_DURATION = 5;
+    private Random r = new Random();
+
+    public BSPPeerThread(Configuration conf) throws IOException {
+      this.peer = new BSPPeer(conf);
+    }
+
+    @Override
+    public void run() {
+      int randomTime;
+      byte[] dummyData = new byte[PAYLOAD];
+      BSPMessage msg = null;
+      InetSocketAddress addr = null;
+
+      for (int i = 0; i < ROUND; i++) {
+        randomTime = r.nextInt(MAXIMUM_DURATION) + 5;
+
+        for (int j = 0; j < 10; j++) {
+          r.nextBytes(dummyData);
+          msg = new BSPMessage(Bytes.tail(dummyData, 128), dummyData);
+
+          addr = new InetSocketAddress("localhost", 30000 + j);
+          try {
+            peer.send(addr, msg);
+          } catch (IOException e) {
+            LOG.info(e);
+          }
+        }
+
+        try {
+          Thread.sleep(randomTime * 1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        try {
+          peer.sync();
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (KeeperException e) {
+          e.printStackTrace();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        verifyPayload();
+      }
+    }
+
+    private void verifyPayload() {
+      System.out.println("[" + getName() + "] verifying "
+          + peer.localQueue.size() + " messages");
+      BSPMessage msg = null;
+
+      try {
+        while ((msg = peer.getCurrentMessage()) != null) {
+          assertEquals(Bytes.compareTo(msg.tag, 0, 128, msg.data,
+              msg.data.length - 128, 128), 0);
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+
+      peer.localQueue.clear();
+    }
+  }
+
+  public void testSync() throws InterruptedException, IOException {
+
+    BSPPeerThread thread;
+    for (int i = 0; i < NUM_PEER; i++) {
+      conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
+      conf.set(BSPConstants.PEER_HOST, "localhost");
+      conf.set(BSPConstants.PEER_PORT, String.valueOf(30000 + i));
+      conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+      thread = new BSPPeerThread(conf);
+      list.add(thread);
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).start();
+    }
+
+    for (int i = 0; i < NUM_PEER; i++) {
+      list.get(i).join();
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+  }
+}



Mime
View raw message