hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1384022 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/BSPPeer.java main/java/org/apache/hama/bsp/BSPPeerImpl.java test/java/org/apache/hama/bsp/TestCheckpoint.java test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
Date Wed, 12 Sep 2012 16:27:45 GMT
Author: tjungblut
Date: Wed Sep 12 16:27:45 2012
New Revision: 1384022

URL: http://svn.apache.org/viewvc?rev=1384022&view=rev
Log:
Several small fixes

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Wed Sep 12 16:27:45 2012
@@ -27,10 +27,9 @@ import org.apache.hama.bsp.sync.SyncExce
 import org.apache.hama.util.KeyValuePair;
 
 /**
- * BSP communication interface.
- * Reads key-value inputs, with K1 typed keys and V1 typed values.
- * Collects key-value outputs, with k2 typed keys and V2 typed values.
- * Exchange messages with other {@link BSPPeer}s via messages of type M.
+ * BSP communication interface. Reads key-value inputs, with K1 typed keys and
+ * V1 typed values. Collects key-value outputs, with k2 typed keys and V2 typed
+ * values. Exchange messages with other {@link BSPPeer}s via messages of type M.
  */
 public interface BSPPeer<K1, V1, K2, V2, M extends Writable> extends Constants {
 
@@ -186,10 +185,15 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @return the size of assigned split
    */
   public long getSplitSize();
-  
+
   /**
    * @return the current position of the file read pointer
    * @throws IOException
    */
   public long getPos() throws IOException;
+
+  /**
+   * @return the task id of this task.
+   */
+  public TaskAttemptID getTaskId();
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Wed Sep 12 16:27:45
2012
@@ -96,7 +96,7 @@ public final class BSPPeerImpl<K1, V1, K
   private FaultTolerantPeerService<M> faultToleranceService;
 
   private long splitSize = 0L;
-  
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -232,6 +232,10 @@ public final class BSPPeerImpl<K1, V1, K
         }
       }
     }
+
+    // init the internal state
+    initialize();
+
     doFirstSync(superstep);
 
     if (LOG.isDebugEnabled()) {
@@ -281,7 +285,7 @@ public final class BSPPeerImpl<K1, V1, K
   }
 
   @SuppressWarnings("unchecked")
-  public final void initialize() throws Exception {
+  private final void initialize() throws Exception {
 
     initInput();
 
@@ -348,7 +352,7 @@ public final class BSPPeerImpl<K1, V1, K
   public long getSplitSize() {
     return splitSize;
   }
-  
+
   /**
    * @return the position in the input stream.
    */
@@ -356,7 +360,7 @@ public final class BSPPeerImpl<K1, V1, K
   public long getPos() throws IOException {
     return in.getPos();
   }
-  
+
   public final void initilizeMessaging() throws ClassNotFoundException {
     messenger = MessageManagerFactory.getMessageManager(conf);
     messenger.init(taskId, this, conf, peerAddress);
@@ -463,7 +467,7 @@ public final class BSPPeerImpl<K1, V1, K
     messenger.clearOutgoingQueues();
 
     leaveBarrier();
-    
+
     incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
         (System.currentTimeMillis() - startBarrier));
     incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);
@@ -479,7 +483,7 @@ public final class BSPPeerImpl<K1, V1, K
     }
 
     umbilical.statusUpdate(taskId, currentTaskStatus);
-    
+
   }
 
   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -702,4 +706,9 @@ public final class BSPPeerImpl<K1, V1, K
     }
   }
 
+  @Override
+  public TaskAttemptID getTaskId() {
+    return taskId;
+  }
+
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Wed Sep 12 16:27:45
2012
@@ -169,10 +169,9 @@ public class TestCheckpoint extends Test
         superstepCount = 0L;
 
       try {
-        fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
-            job, (BSPPeer<?, ?, ?, ?, Text>) this,
-            (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
-            messenger);
+        fService = (new AsyncRcvdMsgCheckpointImpl<Text>())
+            .constructPeerFaultTolerance(job, this, syncClient, null, taskId,
+                superstep, conf, messenger);
         this.fService.onPeerInitialized(state);
       } catch (Exception e) {
         e.printStackTrace();
@@ -288,16 +287,19 @@ public class TestCheckpoint extends Test
 
     @Override
     public long getSplitSize() {
-      // TODO Auto-generated method stub
       return 0;
     }
 
     @Override
     public long getPos() throws IOException {
-      // TODO Auto-generated method stub
       return 0;
     }
 
+    @Override
+    public TaskAttemptID getTaskId() {
+      return null;
+    }
+
   }
 
   public static class TempSyncClient extends BSPPeerSyncClient {
@@ -328,13 +330,12 @@ public class TestCheckpoint extends Test
     }
 
     @Override
-    public boolean getInformation(String key,
-        Writable valueHolder) {
+    public boolean getInformation(String key, Writable valueHolder) {
       LOG.info("Getting value for key " + key);
-      if(!valueMap.containsKey(key)){
+      if (!valueMap.containsKey(key)) {
         return false;
       }
-      Writable value =  valueMap.get(key);
+      Writable value = valueMap.get(key);
       ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
       DataOutputStream outputStream = new DataOutputStream(byteStream);
       byte[] data = null;
@@ -440,19 +441,19 @@ public class TestCheckpoint extends Test
 
   }
 
-  private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+  private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
       @SuppressWarnings("rawtypes")
       BSPPeer bspTask, BSPJob job, long step, long count) {
-    
+
     ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
-    
+
     boolean result = syncClient.getInformation(
         syncClient.constructKey(job.getJobID(), "checkpoint",
             "" + bspTask.getPeerIndex()), writableVal);
-    
+
     assertTrue(result);
 
-    LongWritable superstepNo = (LongWritable) writableVal.get()[0]; 
+    LongWritable superstepNo = (LongWritable) writableVal.get()[0];
     LongWritable msgCount = (LongWritable) writableVal.get()[1];
 
     assertEquals(step, superstepNo.get());
@@ -477,8 +478,7 @@ public class TestCheckpoint extends Test
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
     TestMessageManager messenger = new TestMessageManager();
-    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
-        .getPeerSyncClient(config);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
     @SuppressWarnings("rawtypes")
     BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
         (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
@@ -489,15 +489,15 @@ public class TestCheckpoint extends Test
     int port = BSPNetUtils.getFreePort(12502);
     LOG.info("Got port = " + port);
 
-    boolean result = syncClient.getInformation(
-            syncClient.constructKey(job.getJobID(), "checkpoint",
-                "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
+    boolean result = syncClient
+        .getInformation(syncClient.constructKey(job.getJobID(), "checkpoint",
+            "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
 
     assertFalse(result);
 
     bspTask.sync();
     // Superstep 1
-  
+
     checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
 
     Text txtMessage = new Text("data");
@@ -559,8 +559,7 @@ public class TestCheckpoint extends Test
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
     TestMessageManager messenger = new TestMessageManager();
-    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
-        .getPeerSyncClient(config);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
     BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
         (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
 
@@ -578,7 +577,7 @@ public class TestCheckpoint extends Test
 
     Text txtMessage = new Text("data");
     messenger.addMessage(txtMessage);
-    
+
     bspTask.sync();
 
     LOG.info("Completed second sync.");
@@ -620,8 +619,7 @@ public class TestCheckpoint extends Test
     TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
     TestMessageManager messenger = new TestMessageManager();
-    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
-        .getPeerSyncClient(config);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(config);
 
     Text txtMessage = new Text("data");
     String writeKey = "job_checkpttest_0001/checkpoint/1/";

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1384022&r1=1384021&r2=1384022&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Wed
Sep 12 16:27:45 2012
@@ -122,7 +122,7 @@ public class TestSyncServiceFactory exte
 
     Thread.sleep(1000);
 
-    final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+    final PeerSyncClient syncClient = SyncServiceFactory
         .getPeerSyncClient(conf);
     assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
     BSPJobID jobId = new BSPJobID("abc", 1);



Mime
View raw message