Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 532E2D866 for ; Sun, 5 Aug 2012 13:14:24 +0000 (UTC) Received: (qmail 83050 invoked by uid 500); 5 Aug 2012 13:14:24 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 83027 invoked by uid 500); 5 Aug 2012 13:14:24 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 83019 invoked by uid 99); 5 Aug 2012 13:14:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Aug 2012 13:14:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Aug 2012 13:14:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2D84523888FD for ; Sun, 5 Aug 2012 13:13:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1369575 [2/2] - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/h... Date: Sun, 05 Aug 2012 13:13:28 -0000 To: commits@hama.apache.org From: surajsmenon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120805131330.2D84523888FD@eris.apache.org> Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sun Aug 5 13:13:26 2012 @@ -22,7 +22,9 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map.Entry; +import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BSPPeerImpl; import org.apache.hama.bsp.TaskAttemptID; @@ -61,6 +64,9 @@ public abstract class AbstractMessageMan // the task attempt id protected TaskAttemptID attemptId; + // List of listeners for all the sent messages + protected Queue> messageListenerQueue; + /* * (non-Javadoc) * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp. @@ -70,12 +76,14 @@ public abstract class AbstractMessageMan @Override public void init(TaskAttemptID attemptId, BSPPeer peer, Configuration conf, InetSocketAddress peerAddress) { + this.messageListenerQueue = new LinkedList>(); this.attemptId = attemptId; this.peer = peer; this.conf = conf; this.peerAddress = peerAddress; localQueue = getQueue(); localQueueForNextIteration = getSynchronizedQueue(); + } /* @@ -84,18 +92,22 @@ public abstract class AbstractMessageMan */ @Override public void close() { - Collection> values = outgoingQueues.values(); - for (MessageQueue msgQueue : values) { - msgQueue.close(); - } - localQueue.close(); - // remove possible disk queues from the path try { - FileSystem.get(conf).delete( - DiskQueue.getQueueDir(conf, attemptId, - conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true); - } catch (IOException e) { - LOG.warn("Queue dir couldn't be deleted"); + Collection> values = outgoingQueues.values(); + for (MessageQueue msgQueue : values) { + msgQueue.close(); + } + localQueue.close(); + // remove possible disk queues from the path + try { + FileSystem.get(conf).delete( + DiskQueue.getQueueDir(conf, attemptId, + conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true); + } catch (IOException e) { + LOG.warn("Queue dir couldn't be deleted"); + } + } finally { + notifyClose(); } } @@ -139,6 +151,7 @@ public abstract class AbstractMessageMan localQueue = localQueueForNextIteration.getMessageQueue(); localQueue.prepareRead(); localQueueForNextIteration = getSynchronizedQueue(); + notifyInit(); } /* @@ -163,6 +176,7 @@ public abstract class AbstractMessageMan queue.add(msg); peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L); outgoingQueues.put(targetPeerAddress, queue); + notifySentMessage(peerName, msg); } /* @@ -206,4 +220,68 @@ public abstract class AbstractMessageMan this.conf = conf; } + private void notifySentMessage(String peerName, M message) { + Iterator> iterator = this.messageListenerQueue + .iterator(); + while (iterator.hasNext()) { + iterator.next().onMessageSent(peerName, message); + } + } + + private void notifyReceivedMessage(M message) throws IOException { + Iterator> iterator = this.messageListenerQueue + .iterator(); + while (iterator.hasNext()) { + iterator.next().onMessageReceived(message); + } + } + + private void notifyInit() { + Iterator> iterator = this.messageListenerQueue + .iterator(); + while (iterator.hasNext()) { + iterator.next().onInitialized(); + } + } + + private void notifyClose() { + Iterator> iterator = this.messageListenerQueue + .iterator(); + while (iterator.hasNext()) { + iterator.next().onClose(); + } + } + + + + @Override + public void registerListener(MessageEventListener listener) + throws IOException { + if(listener != null) + this.messageListenerQueue.add(listener); + + } + + @SuppressWarnings("unchecked") + @Override + public void loopBackMessages(BSPMessageBundle bundle) throws IOException{ + for (Writable message : bundle.getMessages()) { + loopBackMessage((M)message); + } + + } + + @SuppressWarnings("unchecked") + @Override + public void loopBackMessage(Writable message) throws IOException{ + this.localQueueForNextIteration.add((M)message); + peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L); + notifyReceivedMessage((M)message); + + } + + + + + } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sun Aug 5 13:13:26 2012 @@ -25,7 +25,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.Iterator; import org.apache.avro.AvroRemoteException; import org.apache.avro.ipc.NettyServer; @@ -61,14 +60,8 @@ public final class AvroMessageManagerImp server.close(); } - public void put(BSPMessageBundle messages) { - peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, - messages.getMessages().size()); - Iterator iterator = messages.getMessages().iterator(); - while (iterator.hasNext()) { - this.localQueueForNextIteration.add(iterator.next()); - iterator.remove(); - } + public void put(BSPMessageBundle messages) throws IOException { + this.loopBackMessages(messages); } @SuppressWarnings("unchecked") @@ -139,5 +132,4 @@ public final class AvroMessageManagerImp return ByteBuffer.wrap(data); } } - } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java Sun Aug 5 13:13:26 2012 @@ -17,6 +17,8 @@ */ package org.apache.hama.bsp.message; +import java.io.IOException; + import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPMessageBundle; import org.apache.hama.bsp.message.compress.BSPCompressedBundle; @@ -35,7 +37,7 @@ public interface HadoopMessageManager messages); + public void put(BSPMessageBundle messages) throws IOException; /** * This method puts a compressed message bundle for the next iteration. @@ -51,6 +53,6 @@ public interface HadoopMessageManager messages) { - for (M message : messages.getMessages()) { - this.localQueueForNextIteration.add(message); - } + public final void put(BSPMessageBundle messages) throws IOException { + loopBackMessages(messages); } @Override - public final void put(BSPCompressedBundle compMsgBundle) { + public final void put(BSPCompressedBundle compMsgBundle) throws IOException { BSPMessageBundle bundle = compressor.decompressBundle(compMsgBundle); - for (M message : bundle.getMessages()) { - this.localQueueForNextIteration.add(message); - } + loopBackMessages(bundle); } @Override @@ -141,4 +135,5 @@ public final class HadoopMessageManagerI return versionID; } + } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sun Aug 5 13:13:26 2012 @@ -34,7 +34,7 @@ import org.apache.hama.bsp.TaskAttemptID * */ public interface MessageManager { - + public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class"; /** @@ -96,4 +96,25 @@ public interface MessageManager bundle) throws IOException; + + /** + * Send the message to self to receive in the next superstep. + */ + public void loopBackMessage(Writable message) throws IOException; + + /** + * Register a listener for the events in message manager. + * + * @param listener MessageEventListener object that processes the + * messages sent to remote peer. + * @throws IOException + */ + public void registerListener(MessageEventListener listener) + throws IOException; + + } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Sun Aug 5 13:13:26 2012 @@ -17,9 +17,10 @@ */ package org.apache.hama.bsp.sync; -import org.apache.hadoop.conf.Configuration; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPJobID; -import org.apache.hama.bsp.TaskAttemptID; /** * Basic interface for a client that connects to a sync server. @@ -28,82 +29,85 @@ import org.apache.hama.bsp.TaskAttemptID public interface SyncClient { /** - * Init will be called within a spawned task, it should be used to initialize - * the inner structure and fields, e.G. a zookeeper client or an rpc - * connection to the real sync daemon. - * - * @throws Exception + * Construct key in the format required by the SyncClient for storing and + * retrieving information. This function is recommended to use to construct + * keys for storing keys. + * @param jobId The BSP Job Id. + * @param args The list of String objects that would be used to construct key + * @return The key consisting of entities provided in the required format. */ - public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId) - throws Exception; + public String constructKey(BSPJobID jobId, String ... args); /** - * Enters the barrier before the message sending in each superstep. - * - * @param jobId the jobs ID - * @param taskId the tasks ID - * @param superstep the superstep of the task - * @throws SyncException + * Stores value for the specified key. + * @param key The key for which value should be stored. It is recommended to use + * constructKey to create key object. + * @param value The value to be stored. + * @param permanent true if the value should be persisted after end of session. + * @param Listener object that provides asynchronous updates on the state + * of information stored under the key. + * @return true if the operation was successful. */ - public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep) - throws SyncException; + public boolean storeInformation(String key, Writable value, + boolean permanent, SyncEventListener listener); /** - * Leaves the barrier after all communication has been done, this is usually - * the end of a superstep. - * - * @param jobId the jobs ID - * @param taskId the tasks ID - * @param superstep the superstep of the task - * @throws SyncException + * Retrieve value previously store for the key. + * @param key The key for which value was stored. + * @param classType The expected class instance of value to be extracted + * @return the value if found. Returns null if there was any error of if there + * was no value stored for the key. */ - public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep) - throws SyncException; + public boolean getInformation(String key, Writable valueHolder); /** - * Registers a specific task with a its host and port to the sync daemon. - * - * @param jobId the jobs ID - * @param taskId the tasks ID - * @param hostAddress the host where the sync server resides - * @param port the port where the sync server is up + * Store new key in key set. + * @param key The key to be saved in key set. It is recommended to use + * constructKey to create key object. + * @param permanent true if the value should be persisted after end of session. + * @param listener Listener object that asynchronously notifies the events + * related to the key. + * @return true if operation was successful. */ - public void register(BSPJobID jobId, TaskAttemptID taskId, - String hostAddress, long port); + public boolean addKey(String key, boolean permanent, SyncEventListener listener); /** - * Returns all registered tasks within the sync daemon. They have to be - * ordered ascending by their task id. - * - * @param taskId the tasks ID - * @return an ordered string array of host:port pairs of all tasks - * connected to the daemon. + * Check if key was previously stored. + * @param key The value of the key. + * @return true if the key exists. */ - public String[] getAllPeerNames(TaskAttemptID taskId); + public boolean hasKey(String key); + + /** + * Get list of child keys stored under the key provided. + * @param key The key whose child key set are to be found. + * @param listener Listener object that asynchronously notifies the changes + * under the provided key + * @return Array of child keys. + */ + public String[] getChildKeySet(String key, SyncEventListener listener); /** - * TODO this has currently no use. Could later be used to deregister tasks - * from the barrier during runtime if they are finished. Something equal to - * voteToHalt() in Pregel. - * - * @param jobId - * @param taskId - * @param hostAddress - * @param port + * Register a listener for events on the key. + * @param key The key on which an event listener should be registered. + * @param event for which the listener is registered for. + * @param listener The event listener that defines how to process the event. + * @return true if the operation is successful. */ - public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId, - String hostAddress, long port); + public boolean registerListener(String key, SyncEvent event, + SyncEventListener listener); /** - * This stops the sync daemon. Only used in YARN. + * Delete the key and the information stored under it. + * @param key + * @param listener + * @return */ - public void stopServer(); - + public boolean remove(String key, SyncEventListener listener); + /** - * This method should close all used resources, e.G. a ZooKeeper instance. * - * @throws InterruptedException */ - public void close() throws InterruptedException; - + public void close() throws IOException; + } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Sun Aug 5 13:13:26 2012 @@ -22,18 +22,27 @@ import org.apache.hadoop.util.Reflection public class SyncServiceFactory { public static final String SYNC_SERVER_CLASS = "hama.sync.server.class"; - public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class"; + public static final String SYNC_PEER_CLASS = "hama.sync.peer.class"; + public static final String SYNC_MASTER_CLASS = "hama.sync.master.class"; /** * Returns a sync client via reflection based on what was configured. */ - public static SyncClient getSyncClient(Configuration conf) + public static PeerSyncClient getPeerSyncClient(Configuration conf) throws ClassNotFoundException { - return (SyncClient) ReflectionUtils - .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS, + return (PeerSyncClient) ReflectionUtils + .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS, ZooKeeperSyncClientImpl.class.getName())), conf); } + + public static SyncClient getMasterSyncClient(Configuration conf) + throws ClassNotFoundException { + return (SyncClient) ReflectionUtils + .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS, + ZKSyncBSPMasterClient.class.getName())), conf); + } + /** * Returns a sync server via reflection based on what was configured. */ Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Sun Aug 5 13:13:26 2012 @@ -17,10 +17,6 @@ */ package org.apache.hama.bsp.sync; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; @@ -47,7 +43,8 @@ import org.apache.zookeeper.data.Stat; * This client class abstracts the use of our zookeeper sync code. * */ -public class ZooKeeperSyncClientImpl implements SyncClient, Watcher { +public class ZooKeeperSyncClientImpl extends ZKSyncClient implements + PeerSyncClient { /* * TODO maybe extract an abstract class and let the subclasses implement @@ -81,6 +78,8 @@ public class ZooKeeperSyncClientImpl imp int bindPort = conf .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); + initialize(this.zk, bspRoot); + peerAddress = new InetSocketAddress(bindAddress, bindPort); LOG.info("Start connecting to Zookeeper! At " + peerAddress); numBSPTasks = conf.getInt("bsp.peers.num", 1); @@ -93,14 +92,14 @@ public class ZooKeeperSyncClientImpl imp try { synchronized (zk) { - createZnode(bspRoot); - final String pathToJobIdZnode = bspRoot + "/" - + taskId.getJobID().toString(); - createZnode(pathToJobIdZnode); - final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep; - createZnode(pathToSuperstepZnode); + + final String pathToSuperstepZnode = + constructKey(taskId.getJobID(), "sync", ""+superstep); + + writeNode(pathToSuperstepZnode, null, true, null); BarrierWatcher barrierWatcher = new BarrierWatcher(); - // this is really needed to register the barrier watcher, don't remove this line! + // this is really needed to register the barrier watcher, don't remove + // this line! zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher); zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); @@ -131,7 +130,7 @@ public class ZooKeeperSyncClientImpl imp } else { LOG.debug("---> at superstep: " + superstep + " task that is creating /ready znode:" + taskId.toString()); - createEphemeralZnode(pathToSuperstepZnode + "/ready"); + writeNode(pathToSuperstepZnode + "/ready", null, false, null); } } } catch (Exception e) { @@ -143,8 +142,10 @@ public class ZooKeeperSyncClientImpl imp public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId, final long superstep) throws SyncException { try { - final String pathToSuperstepZnode = bspRoot + "/" - + taskId.getJobID().toString() + "/" + superstep; +// final String pathToSuperstepZnode = bspRoot + "/" +// + taskId.getJobID().toString() + "/" + superstep; + final String pathToSuperstepZnode = + constructKey(taskId.getJobID(), "sync", ""+superstep); while (true) { List znodes = zk.getChildren(pathToSuperstepZnode, false); LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:" @@ -236,8 +237,9 @@ public class ZooKeeperSyncClientImpl imp public void register(BSPJobID jobId, TaskAttemptID taskId, String hostAddress, long port) { try { - if (zk.exists("/" + jobId.toString(), false) == null) { - zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, + String jobRegisterKey = constructKey(jobId, "peers"); + if (zk.exists(jobRegisterKey, false) == null) { + zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { @@ -245,7 +247,7 @@ public class ZooKeeperSyncClientImpl imp } catch (InterruptedException e) { LOG.error(e); } - registerTask(zk, jobId, hostAddress, port, taskId); + registerTask(jobId, hostAddress, port, taskId); } /** @@ -259,54 +261,14 @@ public class ZooKeeperSyncClientImpl imp * @param port * @param taskId */ - public static void registerTask(ZooKeeper zk, BSPJobID jobId, - String hostAddress, long port, TaskAttemptID taskId) { - - byte[] taskIdBytes = serializeTaskId(taskId); + public void registerTask(BSPJobID jobId, String hostAddress, long port, + TaskAttemptID taskId) { - try { - zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port, - taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (KeeperException e) { - LOG.error(e); - } catch (InterruptedException e) { - LOG.error(e); - } - } - - private static byte[] serializeTaskId(TaskAttemptID taskId) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bos); - try { - taskId.write(out); - } catch (IOException e) { - LOG.error(e); - } finally { - try { - out.close(); - } catch (IOException e) { - LOG.error(e); - } - } - return bos.toByteArray(); - } + // byte[] taskIdBytes = serializeTaskId(taskId); + String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":" + + port); + writeNode(taskRegisterKey, taskId, false, null); - public static TaskAttemptID deserializeTaskId(byte[] arr) { - ByteArrayInputStream bis = new ByteArrayInputStream(arr); - DataInputStream in = new DataInputStream(bis); - TaskAttemptID id = new TaskAttemptID(); - try { - id.readFields(in); - } catch (IOException e) { - LOG.error(e); - } finally { - try { - in.close(); - } catch (IOException e) { - LOG.error(e); - } - } - return id; } @Override @@ -314,16 +276,22 @@ public class ZooKeeperSyncClientImpl imp if (allPeers == null) { TreeMap sortedMap = new TreeMap(); try { - allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this) - .toArray(new String[0]); + allPeers = zk.getChildren(constructKey(taskId.getJobID(), "peers"), + this).toArray(new String[0]); for (String s : allPeers) { - byte[] data = zk.getData( - "/" + taskId.getJobID().toString() + "/" + s, this, null); - TaskAttemptID thatTask = deserializeTaskId(data); - LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:" - + thatTask.getTaskID().getId() + " : " + s); - sortedMap.put(thatTask.getTaskID().getId(), s); + byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s), + this, null); + TaskAttemptID thatTask = new TaskAttemptID(); + boolean result = getValueFromBytes(data, thatTask); + + if(result){ + LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:" + + thatTask.getTaskID().getId() + " : " + s); + sortedMap.put(thatTask.getTaskID().getId(), s); + } + + } } catch (Exception e) { @@ -344,8 +312,13 @@ public class ZooKeeperSyncClientImpl imp } @Override - public void close() throws InterruptedException { - zk.close(); + public void close() throws IOException { + try{ + zk.close(); + } + catch(InterruptedException e){ + throw new IOException(e); + } } @Override @@ -379,36 +352,6 @@ public class ZooKeeperSyncClientImpl imp return peerAddress.getHostName() + ":" + peerAddress.getPort(); } - private String getNodeName(TaskAttemptID taskId, long superstep) { - return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/" - + taskId.toString(); - } - - private void createZnode(final String path) throws KeeperException, - InterruptedException { - createZnode(path, CreateMode.PERSISTENT); - } - - private void createEphemeralZnode(final String path) throws KeeperException, - InterruptedException { - createZnode(path, CreateMode.EPHEMERAL); - } - - private void createZnode(final String path, final CreateMode mode) - throws KeeperException, InterruptedException { - synchronized (zk) { - Stat s = zk.exists(path, false); - if (null == s) { - try { - zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode); - } catch (KeeperException.NodeExistsException nee) { - LOG.debug("Ignore because znode may be already created at " + path, - nee); - } - } - } - } - /* * INNER CLASSES */ Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sun Aug 5 13:13:26 2012 @@ -316,7 +316,7 @@ public class TestBSPTaskFaults extends T HamaConfiguration hamaConf = new HamaConfiguration(); hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200); hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class); - hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, + hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, LocalBSPRunner.LocalSyncClient.class, SyncClient.class); hamaConf.setInt("bsp.master.port", 610002); @@ -421,7 +421,7 @@ public class TestBSPTaskFaults extends T conf.setInt(Constants.GROOM_PING_PERIOD, 200); conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class); - conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, + conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, LocalBSPRunner.LocalSyncClient.class, SyncClient.class); int testNumber = incrementTestNumber(); Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Aug 5 13:13:26 2012 @@ -17,7 +17,18 @@ */ package org.apache.hama.bsp; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import junit.framework.TestCase; @@ -25,19 +36,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.Constants; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer; -import org.apache.hama.bsp.message.type.ByteMessage; -import org.apache.hama.bsp.sync.SyncClient; +import org.apache.hama.bsp.Counters.Counter; +import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl; +import org.apache.hama.bsp.ft.FaultTolerantPeerService; +import org.apache.hama.bsp.message.MessageEventListener; +import org.apache.hama.bsp.message.MessageManager; +import org.apache.hama.bsp.message.MessageQueue; +import org.apache.hama.bsp.sync.BSPPeerSyncClient; +import org.apache.hama.bsp.sync.PeerSyncClient; +import org.apache.hama.bsp.sync.SyncEvent; +import org.apache.hama.bsp.sync.SyncEventListener; +import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.bsp.sync.SyncServiceFactory; -import org.apache.hama.ipc.BSPPeerProtocol; -import org.apache.hama.ipc.HamaRPCProtocolVersion; import org.apache.hama.util.BSPNetUtils; +import org.apache.hama.util.KeyValuePair; public class TestCheckpoint extends TestCase { @@ -45,130 +67,578 @@ public class TestCheckpoint extends Test static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/"; - @SuppressWarnings({ "unchecked", "rawtypes" }) + public static class TestMessageManager implements MessageManager { + + List messageQueue = new ArrayList(); + BSPMessageBundle loopbackBundle = new BSPMessageBundle(); + Iterator iter = null; + MessageEventListener listener; + + @Override + public void init(TaskAttemptID attemptId, BSPPeer peer, + Configuration conf, InetSocketAddress peerAddress) { + // TODO Auto-generated method stub + + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public Text getCurrentMessage() throws IOException { + if (iter == null) + iter = this.messageQueue.iterator(); + if (iter.hasNext()) + return iter.next(); + return null; + } + + @Override + public void send(String peerName, Text msg) throws IOException { + } + + @Override + public void finishSendPhase() throws IOException { + } + + @Override + public Iterator>> getMessageIterator() { + return null; + } + + @Override + public void transfer(InetSocketAddress addr, BSPMessageBundle bundle) + throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void clearOutgoingQueues() { + } + + @Override + public int getNumCurrentMessages() { + return this.messageQueue.size(); + } + + public BSPMessageBundle getLoopbackBundle() { + return this.loopbackBundle; + } + + public void addMessage(Text message) throws IOException { + this.messageQueue.add(message); + listener.onMessageReceived(message); + } + + @SuppressWarnings("unchecked") + @Override + public void loopBackMessages(BSPMessageBundle bundle) { + this.loopbackBundle = (BSPMessageBundle) bundle; + } + + @Override + public void loopBackMessage(Writable message) { + } + + @Override + public void registerListener(MessageEventListener listener) + throws IOException { + this.listener = listener; + } + + } + + public static class TestBSPPeer implements + BSPPeer { + + Configuration conf; + long superstepCount; + FaultTolerantPeerService fService; + + public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId, + Counters counters, long superstep, BSPPeerSyncClient syncClient, + MessageManager messenger, TaskStatus.State state) { + this.conf = conf; + if (superstep > 0) + superstepCount = superstep; + else + superstepCount = 0L; + + try { + fService = (new AsyncRcvdMsgCheckpointImpl()).constructPeerFaultTolerance( + job, (BSPPeer) this, + (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf, + messenger); + this.fService.onPeerInitialized(state); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void send(String peerName, Text msg) throws IOException { + } + + @Override + public Text getCurrentMessage() throws IOException { + return new Text("data"); + } + + @Override + public int getNumCurrentMessages() { + return 1; + } + + @Override + public void sync() throws IOException, SyncException, InterruptedException { + ++superstepCount; + try { + this.fService.afterBarrier(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("After barrier " + superstepCount); + } + + @Override + public long getSuperstepCount() { + return superstepCount; + } + + @Override + public String getPeerName() { + return null; + } + + @Override + public String getPeerName(int index) { + return null; + } + + @Override + public int getPeerIndex() { + return 1; + } + + @Override + public String[] getAllPeerNames() { + return null; + } + + @Override + public int getNumPeers() { + return 0; + } + + @Override + public void clear() { + + } + + @Override + public void write(NullWritable key, NullWritable value) throws IOException { + + } + + @Override + public boolean readNext(NullWritable key, NullWritable value) + throws IOException { + return false; + } + + @Override + public KeyValuePair readNext() + throws IOException { + return null; + } + + @Override + public void reopenInput() throws IOException { + + } + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Counter getCounter(Enum name) { + return null; + } + + @Override + public Counter getCounter(String group, String name) { + return null; + } + + @Override + public void incrementCounter(Enum key, long amount) { + + } + + @Override + public void incrementCounter(String group, String counter, long amount) { + + } + + } + + public static class TempSyncClient extends BSPPeerSyncClient { + + Map valueMap = new HashMap(); + + @Override + public String constructKey(BSPJobID jobId, String... args) { + StringBuffer buffer = new StringBuffer(100); + buffer.append(jobId.toString()).append("/"); + for (String arg : args) { + buffer.append(arg).append("/"); + } + return buffer.toString(); + } + + @Override + public boolean storeInformation(String key, Writable value, + boolean permanent, SyncEventListener listener) { + ArrayWritable writables = (ArrayWritable) value; + long step = ((LongWritable) writables.get()[0]).get(); + long count = ((LongWritable) writables.get()[1]).get(); + + LOG.info("SyncClient Storing value step = " + step + " count = " + count + + " for key " + key); + valueMap.put(key, value); + return true; + } + + @Override + public boolean getInformation(String key, + Writable valueHolder) { + LOG.info("Getting value for key " + key); + if(!valueMap.containsKey(key)){ + return false; + } + Writable value = valueMap.get(key); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteStream); + byte[] data = null; + try { + value.write(outputStream); + outputStream.flush(); + data = byteStream.toByteArray(); + ByteArrayInputStream istream = new ByteArrayInputStream(data); + DataInputStream diStream = new DataInputStream(istream); + valueHolder.readFields(diStream); + return true; + } catch (IOException e) { + LOG.error("Error writing data to write buffer.", e); + } finally { + try { + byteStream.close(); + outputStream.close(); + } catch (IOException e) { + LOG.error("Error closing byte stream.", e); + } + } + return false; + } + + @Override + public boolean addKey(String key, boolean permanent, + SyncEventListener listener) { + valueMap.put(key, NullWritable.get()); + return true; + } + + @Override + public boolean hasKey(String key) { + return valueMap.containsKey(key); + } + + @Override + public String[] getChildKeySet(String key, SyncEventListener listener) { + List list = new ArrayList(); + Iterator keyIter = valueMap.keySet().iterator(); + while (keyIter.hasNext()) { + String keyVal = keyIter.next(); + if (keyVal.startsWith(key + "/")) { + list.add(keyVal); + } + } + String[] arr = new String[list.size()]; + list.toArray(arr); + return arr; + } + + @Override + public boolean registerListener(String key, SyncEvent event, + SyncEventListener listener) { + return false; + } + + @Override + public boolean remove(String key, SyncEventListener listener) { + valueMap.remove(key); + return false; + } + + @Override + public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId) + throws Exception { + } + + @Override + public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, + long superstep) throws SyncException { + LOG.info("Enter barrier called - " + superstep); + } + + @Override + public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, + long superstep) throws SyncException { + LOG.info("Exit barrier called - " + superstep); + } + + @Override + public void register(BSPJobID jobId, TaskAttemptID taskId, + String hostAddress, long port) { + } + + @Override + public String[] getAllPeerNames(TaskAttemptID taskId) { + return null; + } + + @Override + public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId, + String hostAddress, long port) { + } + + @Override + public void stopServer() { + } + + @Override + public void close() throws IOException { + } + + } + + private void checkSuperstepMsgCount(PeerSyncClient syncClient, + @SuppressWarnings("rawtypes") + BSPPeer bspTask, BSPJob job, long step, long count) { + + ArrayWritable writableVal = new ArrayWritable(LongWritable.class); + + boolean result = syncClient.getInformation( + syncClient.constructKey(job.getJobID(), "checkpoint", + "" + bspTask.getPeerIndex()), writableVal); + + assertTrue(result); + + LongWritable superstepNo = (LongWritable) writableVal.get()[0]; + LongWritable msgCount = (LongWritable) writableVal.get()[1]; + + assertEquals(step, superstepNo.get()); + assertEquals(count, msgCount.get()); + } + + public void testCheckpointInterval() throws Exception { + Configuration config = new Configuration(); + System.setProperty("user.dir", "/tmp"); + config.set(SyncServiceFactory.SYNC_PEER_CLASS, + TempSyncClient.class.getName()); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + config.setInt(Constants.CHECKPOINT_INTERVAL, 2); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); + + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); + + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + @SuppressWarnings("rawtypes") + BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, + (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); + + assertNotNull("BSPPeerImpl should not be null.", bspTask); + + LOG.info("Created bsp peer and other parameters"); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); + + boolean result = syncClient.getInformation( + syncClient.constructKey(job.getJobID(), "checkpoint", + "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class)); + + assertFalse(result); + + bspTask.sync(); + // Superstep 1 + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + Text txtMessage = new Text("data"); + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 2 + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 3 + + checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); + + bspTask.sync(); + // Superstep 4 + + checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L); + + messenger.addMessage(txtMessage); + messenger.addMessage(txtMessage); + + bspTask.sync(); + // Superstep 5 + + checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); + + bspTask.sync(); + // Superstep 6 + + checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L); + + dfs.delete(new Path("checkpoint"), true); + } + + @SuppressWarnings("rawtypes") public void testCheckpoint() throws Exception { Configuration config = new Configuration(); - config.set(SyncServiceFactory.SYNC_CLIENT_CLASS, - LocalBSPRunner.LocalSyncClient.class.getName()); + config.set(SyncServiceFactory.SYNC_PEER_CLASS, + TempSyncClient.class.getName()); + config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); + + config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + config.setInt(Constants.PEER_PORT, port); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); + + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L, + (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING); - BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs); - bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), - new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running", - "127.0.0.1", TaskStatus.Phase.STARTING, new Counters())); assertNotNull("BSPPeerImpl should not be null.", bspTask); - if (dfs.mkdirs(new Path("checkpoint"))) { - if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) { - if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0"))) - ; - } - } - assertTrue("Make sure directory is created.", - dfs.exists(new Path(checkpointedDir))); - byte[] tmpData = "data".getBytes(); - BSPMessageBundle bundle = new BSPMessageBundle(); - bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData)); - assertNotNull("Message bundle can not be null.", bundle); - assertNotNull("Configuration should not be null.", config); - bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0", - bundle); - FSDataInputStream in = dfs.open(new Path(checkpointedDir - + "/attempt_201110302255_0001_000000_0")); - BSPMessageBundle bundleRead = new BSPMessageBundle(); - bundleRead.readFields(in); - in.close(); - ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0); - String content = new String(byteMsg.getData()); - LOG.info("Saved checkpointed content is " + content); - assertTrue("Message content should be the same.", "data".equals(content)); + + LOG.info("Created bsp peer and other parameters"); + + @SuppressWarnings("unused") + FaultTolerantPeerService service = null; + + bspTask.sync(); + LOG.info("Completed first sync."); + + checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L); + + Text txtMessage = new Text("data"); + messenger.addMessage(txtMessage); + + bspTask.sync(); + + LOG.info("Completed second sync."); + + checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L); + + // Checking the messages for superstep 2 and peer id 1 + String expectedPath = "checkpoint/job_checkpttest_0001/2/1"; + FSDataInputStream in = dfs.open(new Path(expectedPath)); + + String className = in.readUTF(); + Text message = (Text) ReflectionUtils.newInstance(Class.forName(className), + config); + message.readFields(in); + + assertEquals("data", message.toString()); + dfs.delete(new Path("checkpoint"), true); } - public void testCheckpointInterval() throws Exception { + public void testPeerRecovery() throws Exception { + Configuration config = new Configuration(); + config.set(SyncServiceFactory.SYNC_PEER_CLASS, + TempSyncClient.class.getName()); + config.set(Constants.FAULT_TOLERANCE_CLASS, + AsyncRcvdMsgCheckpointImpl.class.getName()); + config.setBoolean(Constants.CHECKPOINT_ENABLED, true); + int port = BSPNetUtils.getFreePort(12502); + LOG.info("Got port = " + port); - Configuration conf = new Configuration(); - conf.set("bsp.output.dir", "/tmp/hama-test_out"); - conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, - LocalBSPRunner.LocalSyncClient.class, SyncClient.class); - - conf.setBoolean(Constants.CHECKPOINT_ENABLED, false); - - int port = BSPNetUtils.getFreePort(5000); - InetSocketAddress inetAddress = new InetSocketAddress(port); - MinimalGroomServer groom = new MinimalGroomServer(conf); - Server workerServer = RPC.getServer(groom, inetAddress.getHostName(), - inetAddress.getPort(), conf); - workerServer.start(); - - LOG.info("Started RPC server"); - conf.setInt("bsp.groom.rpc.port", inetAddress.getPort()); - conf.setInt("bsp.peers.num", 1); - - BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress, - conf); - LOG.info("Started the proxy connections"); - - TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID( - "job_201110102255", 1), 1), 1); - - try { - BSPJob job = new BSPJob(new HamaConfiguration(conf)); - job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH); - job.setOutputFormat(TextOutputFormat.class); - final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy( - BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, - new InetSocketAddress("127.0.0.1", port), conf); + config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST); + config.setInt(Constants.PEER_PORT, port); - BSPTask task = new BSPTask(); - task.setConf(job); + config.set("bsp.output.dir", "/tmp/hama-test_out"); + config.set("bsp.local.dir", "/tmp/hama-test"); - @SuppressWarnings("rawtypes") - BSPPeerImpl bspPeer = new BSPPeerImpl(job, conf, tid, - proto, 0, null, null, new Counters()); + FileSystem dfs = FileSystem.get(config); + BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp"); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1); - bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f, - TaskStatus.State.RUNNING, "running", "127.0.0.1", - TaskStatus.Phase.STARTING, new Counters())); - - assertEquals(bspPeer.isReadyToCheckpoint(), false); - - conf.setBoolean(Constants.CHECKPOINT_ENABLED, true); - conf.setInt(Constants.CHECKPOINT_INTERVAL, 3); - - bspPeer.sync(); - - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), true); - - job.setCheckPointInterval(5); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - bspPeer.sync(); - LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step " - + bspPeer.getSuperstepCount()); - assertEquals(bspPeer.isReadyToCheckpoint(), false); - - } catch (Exception e) { - LOG.error("Error testing BSPPeer.", e); - } finally { - umbilical.close(); - Thread.sleep(2000); - workerServer.stop(); - Thread.sleep(2000); + TestMessageManager messenger = new TestMessageManager(); + PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory + .getPeerSyncClient(config); + + Text txtMessage = new Text("data"); + String writeKey = "job_checkpttest_0001/checkpoint/1/"; + + Writable[] writableArr = new Writable[2]; + writableArr[0] = new LongWritable(3L); + writableArr[1] = new LongWritable(5L); + ArrayWritable arrWritable = new ArrayWritable(LongWritable.class); + arrWritable.set(writableArr); + syncClient.storeInformation(writeKey, arrWritable, true, null); + + String writePath = "checkpoint/job_checkpttest_0001/3/1"; + FSDataOutputStream out = dfs.create(new Path(writePath)); + for (int i = 0; i < 5; ++i) { + out.writeUTF(txtMessage.getClass().getCanonicalName()); + txtMessage.write(out); } + out.close(); + @SuppressWarnings("unused") + BSPPeer bspTask = new TestBSPPeer(job, config, taskId, + new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger, + TaskStatus.State.RECOVERING); + + BSPMessageBundle bundleRead = messenger.getLoopbackBundle(); + assertEquals(5, bundleRead.getMessages().size()); + String recoveredMsg = bundleRead.getMessages().get(0).toString(); + assertEquals(recoveredMsg, "data"); + dfs.delete(new Path("checkpoint"), true); } + } Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sun Aug 5 13:13:26 2012 @@ -20,20 +20,22 @@ package org.apache.hama.bsp; import java.io.IOException; -import java.util.ArrayList; import junit.framework.TestCase; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.sync.SyncServiceFactory; +import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient; +import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl; import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl; import org.apache.hama.util.BSPNetUtils; -import org.apache.hama.zookeeper.QuorumPeer; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; +import org.mortbay.log.Log; public class TestZooKeeper extends TestCase { @@ -41,6 +43,7 @@ public class TestZooKeeper extends TestC public TestZooKeeper() { configuration = new HamaConfiguration(); + System.setProperty("user.dir", "/tmp"); configuration.set("bsp.master.address", "localhost"); assertEquals("Make sure master addr is set to localhost:", "localhost", configuration.get("bsp.master.address")); @@ -57,6 +60,7 @@ public class TestZooKeeper extends TestC public void testClearZKNodes() throws IOException, KeeperException, InterruptedException { final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl(); + boolean done = false; try { server.init(configuration); new Thread(new Runnable() { @@ -71,55 +75,113 @@ public class TestZooKeeper extends TestC } }).start(); - int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, - 6000); - String connectStr = QuorumPeer.getZKQuorumServersString(configuration); - String bspRoot = "/"; - // Establishing a zk session. - ZooKeeper zk = new ZooKeeper(connectStr, timeout, null); - - // Creating dummy bspRoot if it doesn't already exist. - Stat s = zk.exists(bspRoot, false); - if (s == null) { - zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - - // Creating dummy child nodes at depth 1. - String node1 = bspRoot + "task1"; - String node2 = bspRoot + "task2"; - zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - // Creating dummy child node at depth 2. - String node11 = node1 + "superstep1"; - zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - ArrayList list = (ArrayList) zk.getChildren(bspRoot, - false); - assertEquals(2, list.size()); - System.out.println(list.size()); - - // clear it - BSPMaster.clearZKNodes(zk, "/"); - - list = (ArrayList) zk.getChildren(bspRoot, false); - System.out.println(list.size()); - assertEquals(0, list.size()); - - try { - zk.getData(node11, false, null); - fail(); - } catch (KeeperException.NoNodeException e) { - System.out.println("Node has been removed correctly!"); - } finally { - zk.close(); - } + Thread.sleep(1000); + + String bspRoot = "/bsp"; + + ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory + .getPeerSyncClient(configuration); + + ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory + .getMasterSyncClient(configuration); + + masterClient.init(configuration); + + Thread.sleep(100); + + Log.info("Created master and client sync clients"); + + assertTrue(masterClient.hasKey(bspRoot)); + + Log.info("BSP root exists"); + + BSPJobID jobID = new BSPJobID("test1", 1); + masterClient.registerJob(jobID.toString()); + TaskID taskId1 = new TaskID(jobID, 1); + TaskID taskId2 = new TaskID(jobID, 2); + + TaskAttemptID task1 = new TaskAttemptID(taskId1, 1); + TaskAttemptID task2 = new TaskAttemptID(taskId2, 1); + + int zkPort = BSPNetUtils.getFreePort(21815); + configuration.setInt(Constants.PEER_PORT, zkPort); + peerClient.init(configuration, jobID, task1); + + peerClient.registerTask(jobID, "hamanode1", 5000L, task1); + peerClient.registerTask(jobID, "hamanode2", 5000L, task2); + + peerClient.storeInformation( + peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5), + true, null); + + String[] names = peerClient.getAllPeerNames(task1); + + Log.info("Found child count = " + names.length); + + assertEquals(2, names.length); + + Log.info("Passed the child count test"); + + masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"), + true, null); + masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"), + true, null); + + String[] peerChild = masterClient.getChildKeySet( + masterClient.constructKey(jobID, "peer"), null); + Log.info("Found child count = " + peerChild.length); + + assertEquals(2, peerChild.length); + + Log.info(" Peer name " + peerChild[0]); + Log.info(" Peer name " + peerChild[1]); + + Log.info("Passed the child key set test"); + + masterClient.deregisterJob(jobID.toString()); + Log.info(masterClient.constructKey(jobID)); + + Thread.sleep(200); + + assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID))); + + Log.info("Passed the key presence test"); + + boolean result = masterClient + .getInformation(masterClient.constructKey(jobID, "info", "level3"), + new IntWritable()); + + assertEquals(false, result); + + Writable[] writableArr = new Writable[2]; + writableArr[0] = new LongWritable(3L); + writableArr[1] = new LongWritable(5L); + ArrayWritable arrWritable = new ArrayWritable(LongWritable.class); + arrWritable.set(writableArr); + masterClient.storeInformation( + masterClient.constructKey(jobID, "info", "level3"), + arrWritable, true, null); + + ArrayWritable valueHolder = new ArrayWritable(LongWritable.class); + + boolean getResult = masterClient.getInformation( + masterClient.constructKey(jobID, "info", "level3"), valueHolder); + + assertTrue(getResult); + + assertEquals(arrWritable.get()[0], valueHolder.get()[0]); + assertEquals(arrWritable.get()[1], valueHolder.get()[1]); + + Log.info("Passed array writable test"); + done = true; + } catch (Exception e) { e.printStackTrace(); + } finally { server.stopServer(); } + assertEquals(true, done); } } Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1369575&r1=1369574&r2=1369575&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Sun Aug 5 13:13:26 2012 @@ -17,20 +17,64 @@ */ package org.apache.hama.bsp.sync; +import java.util.concurrent.Executors; + import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.bsp.BSPJobID; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.bsp.TaskID; +import org.apache.hama.util.BSPNetUtils; public class TestSyncServiceFactory extends TestCase { + public static final Log LOG = LogFactory.getLog(TestCase.class); + + public static class ListenerTest extends ZKSyncEventListener { + + private Text value; + + public ListenerTest() { + value = new Text("init"); + } + + public String getValue() { + return value.toString(); + } + + @Override + public void onDelete() { + + } + + @Override + public void onChange() { + LOG.info("ZK value changed event triggered."); + value.set("Changed"); + + } + + @Override + public void onChildKeySetChange() { + + } + + } + public void testClientInstantiation() throws Exception { Configuration conf = new Configuration(); // given null, should return zookeeper - SyncClient syncClient = SyncServiceFactory.getSyncClient(conf); + PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(conf); assertTrue(syncClient instanceof ZooKeeperSyncClientImpl); } - + public void testServerInstantiation() throws Exception { Configuration conf = new Configuration(); @@ -39,4 +83,104 @@ public class TestSyncServiceFactory exte assertTrue(syncServer instanceof ZooKeeperSyncServerImpl); } + private static class ZKServerThread implements Runnable { + + SyncServer server; + + ZKServerThread(SyncServer s) { + server = s; + } + + @Override + public void run() { + try { + server.start(); + } catch (Exception e) { + LOG.error("Error running server.", e); + } + } + + } + + public void testZKSyncStore() throws Exception { + Configuration conf = new Configuration(); + int zkPort = BSPNetUtils.getFreePort(21811); + conf.set("bsp.local.dir", "/tmp/hama-test"); + conf.set("bsp.output.dir", "/tmp/hama-test_out"); + conf.setInt(Constants.PEER_PORT, zkPort); + conf.set(Constants.ZOOKEEPER_QUORUM, "localhost"); + conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort); + conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000"); + System.setProperty("user.dir", "/tmp"); + // given null, should return zookeeper + final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf); + syncServer.init(conf); + assertTrue(syncServer instanceof ZooKeeperSyncServerImpl); + + ZKServerThread serverThread = new ZKServerThread(syncServer); + Executors.newFixedThreadPool(1).submit(serverThread); + + Thread.sleep(1000); + + final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory + .getPeerSyncClient(conf); + assertTrue(syncClient instanceof ZooKeeperSyncClientImpl); + BSPJobID jobId = new BSPJobID("abc", 1); + TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1); + syncClient.init(conf, jobId, taskId); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + syncServer.stopServer(); + + } catch (Exception e) { + // too late to log! + } + } + }); + + IntWritable data = new IntWritable(5); + syncClient.storeInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true, + null); + + ListenerTest listenerTest = new ListenerTest(); + + syncClient.registerListener( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + ZKSyncEventFactory.getValueChangeEvent(), listenerTest); + + IntWritable valueHolder = new IntWritable(); + boolean result = syncClient + .getInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + valueHolder); + assertTrue(result); + int intVal = valueHolder.get(); + assertTrue(intVal == data.get()); + + data.set(6); + syncClient.storeInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true, + null); + valueHolder = new IntWritable(); + result = syncClient + .getInformation( + syncClient.constructKey(jobId, String.valueOf(1L), "test"), + valueHolder); + + assertTrue(result); + intVal = valueHolder.get(); + assertTrue(intVal == data.get()); + + Thread.sleep(5000); + + assertEquals(true, listenerTest.getValue().equals("Changed")); + + syncServer.stopServer(); + + } + }