hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1701715 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ graph/src/main/java/org/apache/ha...
Date Tue, 08 Sep 2015 01:53:53 GMT
Author: edwardyoon
Date: Tue Sep  8 01:53:53 2015
New Revision: 1701715

URL: http://svn.apache.org/r1701715
Log:
HAMA-973: Fix FT bugs

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Tue
Sep  8 01:53:53 2015
@@ -390,16 +390,26 @@ public class AsyncRcvdMsgCheckpointImpl<
           Path path = new Path(checkpointPath(superstepProgress));
           FSDataInputStream in = this.fs.open(path);
           BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+
           try {
             for (int i = 0; i < numMessages; ++i) {
               String className = in.readUTF();
-              @SuppressWarnings("unchecked")
-              M message = (M) ReflectionUtils.newInstance(
-                  Class.forName(className), conf);
-              message.readFields(in);
-              bundle.addMessage(message);
+              if (className.equals(BSPMessageBundle.class.getCanonicalName())) {
+                BSPMessageBundle<M> tmp = new BSPMessageBundle<M>();
+                tmp.readFields(in);
+                messenger.loopBackBundle(tmp);
+              } else {
+                @SuppressWarnings("unchecked")
+                M message = (M) ReflectionUtils.newInstance(
+                    Class.forName(className), conf);
+                message.readFields(in);
+                bundle.addMessage(message);
+              }
+            }
+
+            if (bundle.size() > 0) {
+              messenger.loopBackBundle(bundle);
             }
-            messenger.loopBackBundle(bundle);
           } catch (EOFException e) {
             LOG.error("Error recovering from checkpointing", e);
             throw new IOException(e);
@@ -514,6 +524,7 @@ public class AsyncRcvdMsgCheckpointImpl<
                   + checkpointedPath, ioe);
             }
           }
+          
           try {
             ++checkpointMessageCount;
             checkpointStream.writeUTF(message.getClass().getCanonicalName());
@@ -537,6 +548,49 @@ public class AsyncRcvdMsgCheckpointImpl<
 
     }
 
+    @Override
+    public void onBundleReceived(BSPMessageBundle<M> bundle) {
+      String checkpointedPath = null;
+
+      if (bundle == null) {
+        LOG.error("bundle is found to be null");
+      }
+
+      synchronized (this) {
+        if (checkpointState) {
+          if (this.checkpointStream == null) {
+            checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+            try {
+              LOG.info("Creating path " + checkpointedPath);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating path " + checkpointedPath);
+              }
+              checkpointStream = this.fs.create(new Path(checkpointedPath));
+            } catch (IOException ioe) {
+              LOG.error("Fail checkpointing messages to " + checkpointedPath,
+                  ioe);
+              throw new RuntimeException("Failed opening HDFS file "
+                  + checkpointedPath, ioe);
+            }
+          }
+
+          try {
+            ++checkpointMessageCount;
+            checkpointStream.writeUTF(bundle.getClass().getCanonicalName());
+            bundle.write(checkpointStream);
+          } catch (IOException ioe) {
+            LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe);
+            throw new RuntimeException("Failed writing to HDFS file "
+                + checkpointedPath, ioe);
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("message count = " + checkpointMessageCount);
+          }
+        }
+      }
+    }
+
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Sep  8 01:53:53 2015
@@ -129,7 +129,7 @@ public abstract class AbstractMessageMan
   public final int getNumCurrentMessages() {
     return localQueue.size();
   }
-  
+
   public void clearIncomingMessages() {
     localQueue.clear();
   }
@@ -145,13 +145,13 @@ public abstract class AbstractMessageMan
     if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
         && localQueue.size() > 0) {
 
-        // To reduce the number of element additions
-        if (localQueue.size() > localQueueForNextIteration.size()) {
-          localQueue.addAll(localQueueForNextIteration);
-        } else {
-          localQueueForNextIteration.addAll(localQueue);
-          localQueue = localQueueForNextIteration.getMessageQueue();
-        }
+      // To reduce the number of element additions
+      if (localQueue.size() > localQueueForNextIteration.size()) {
+        localQueue.addAll(localQueueForNextIteration);
+      } else {
+        localQueueForNextIteration.addAll(localQueue);
+        localQueue = localQueueForNextIteration.getMessageQueue();
+      }
 
     } else {
       if (localQueue != null) {
@@ -178,7 +178,7 @@ public abstract class AbstractMessageMan
 
     notifySentMessage(peerName, msg);
   }
-  
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#getMessageIterator()
@@ -239,6 +239,13 @@ public abstract class AbstractMessageMan
     }
   }
 
+  private void notifyReceivedMessage(BSPMessageBundle<M> bundle)
+      throws IOException {
+    for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue)
{
+      aMessageListenerQueue.onBundleReceived(bundle);
+    }
+  }
+
   private void notifyInit() {
     for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue)
{
       aMessageListenerQueue.onInitialized();
@@ -261,11 +268,10 @@ public abstract class AbstractMessageMan
 
   @Override
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bundle.size());
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+        bundle.size());
     this.localQueueForNextIteration.addBundle(bundle);
-    
-    // TODO checkpoint bundle itself instead of unpacked messages. -- edwardyoon
-    // notifyReceivedMessage(bundle);
+    notifyReceivedMessage(bundle);
   }
 
   @SuppressWarnings("unchecked")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java Tue
Sep  8 01:53:53 2015
@@ -17,7 +17,10 @@
  */
 package org.apache.hama.bsp.message;
 
-public interface MessageEventListener<M> {
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+public interface MessageEventListener<M extends Writable> {
 
   /**
    * 
@@ -48,6 +51,8 @@ public interface MessageEventListener<M>
    * @param message The message received.
    */
   void onMessageReceived(final M message);
+  
+  void onBundleReceived(final BSPMessageBundle<M> bundle);
 
   /**
    * The function to handle the event when the queue is closed.

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue Sep  8 01:53:53
2015
@@ -38,6 +38,8 @@ public class TestPersistQueue extends Te
   public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
 
   public void testMemoryQueue() throws Exception {
+    BSPMessageBundle<IntWritable> x = new BSPMessageBundle<IntWritable>();
+    System.out.println(x.getClass().getCanonicalName() + ", " + BSPMessageBundle.class.getCanonicalName());
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
         "org.apache.hama.bsp.message.queue.MemoryQueue");

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Tue Sep  8 01:53:53
2015
@@ -55,7 +55,7 @@ public class RandBench {
       byte[] dummyData = new byte[sizeOfMsg];
       String[] peers = peer.getAllPeerNames();
 
-      for (int i = 0; i < nSupersteps; i++) {
+      for (int i = (int) peer.getSuperstepCount(); i < nSupersteps; i++) {
 
         for (int j = 0; j < nCommunications; j++) {
           String tPeer = peers[r.nextInt(peers.length)];

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1701715&r1=1701714&r2=1701715&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Sep  8 01:53:53
2015
@@ -145,10 +145,12 @@ public final class GraphJobRunner<V exte
     LOG.info("Total time spent for broadcasting global vertex count: "
         + (System.currentTimeMillis() - startTime) + " ms");
 
-    startTime = System.currentTimeMillis();
-    doInitialSuperstep(peer);
-    LOG.info("Total time spent for initial superstep: "
-        + (System.currentTimeMillis() - startTime) + " ms");
+    if (peer.getSuperstepCount() == 2) {
+      startTime = System.currentTimeMillis();
+      doInitialSuperstep(peer);
+      LOG.info("Total time spent for initial superstep: "
+          + (System.currentTimeMillis() - startTime) + " ms");
+    }
   }
 
   @Override
@@ -760,10 +762,10 @@ public final class GraphJobRunner<V exte
 
   public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
       final int numOfValues) {
-    
+
     return new Iterable<Writable>() {
       DataInputStream dis;
-      
+
       @Override
       public Iterator<Writable> iterator() {
         if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
@@ -771,7 +773,7 @@ public final class GraphJobRunner<V exte
         } else {
           dis = new DataInputStream(new UnsafeByteArrayInputStream(valuesBytes));
         }
-        
+
         return new Iterator<Writable>() {
           int index = 0;
 



Mime
View raw message