hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1681684 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Tue, 26 May 2015 04:49:26 GMT
Author: edwardyoon
Date: Tue May 26 04:49:26 2015
New Revision: 1681684

URL: http://svn.apache.org/r1681684
Log:
Provide more exception details

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1681684&r1=1681683&r2=1681684&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue May 26 04:49:26
2015
@@ -227,7 +227,7 @@ public final class GraphJobRunner<V exte
         combiner = (Combiner<Writable>) ReflectionUtils
             .newInstance(combinerName);
       } catch (ClassNotFoundException e) {
-        e.printStackTrace();
+        throw new IOException(e);
       }
     }
   }
@@ -242,6 +242,7 @@ public final class GraphJobRunner<V exte
   private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
+    this.errorCount = 0;
     long startTime = System.currentTimeMillis();
 
     this.changedVertexCnt = 0;
@@ -254,8 +255,7 @@ public final class GraphJobRunner<V exte
 
     long loopStartTime = System.currentTimeMillis();
     while (currentMessage != null) {
-      Runnable worker = new ComputeRunnable(currentMessage);
-      executor.execute(worker);
+      executor.execute(new ComputeRunnable(currentMessage));
 
       currentMessage = peer.getCurrentMessage();
     }
@@ -266,7 +266,12 @@ public final class GraphJobRunner<V exte
     try {
       executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOG.error(e);
+      throw new IOException(e);
+    }
+
+    if (errorCount > 0) {
+      throw new IOException("there were " + errorCount
+          + " exceptions during compute vertices.");
     }
 
     Iterator it = vertices.iterator();
@@ -300,6 +305,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     this.changedVertexCnt = 0;
+    this.errorCount = 0;
     vertices.startSuperstep();
 
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
@@ -308,45 +314,45 @@ public final class GraphJobRunner<V exte
     executor.setRejectedExecutionHandler(retryHandler);
 
     for (V v : vertices.keySet()) {
-      Runnable worker = new ComputeRunnable(v);
-      executor.execute(worker);
+      executor.execute(new ComputeRunnable(v));
     }
 
     executor.shutdown();
     try {
       executor.awaitTermination(60, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOG.error(e);
+      throw new IOException(e);
     }
 
+    if (errorCount > 0) {
+      throw new IOException("there were " + errorCount
+          + " exceptions during compute vertices.");
+    }
+    
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
     finishSuperstep();
   }
 
+  private int errorCount = 0;
+
+  public synchronized void incrementErrorCount() {
+    errorCount++;
+  }
+
   class ComputeRunnable implements Runnable {
     Vertex<V, E, M> vertex;
     Iterable<M> msgs;
 
     @SuppressWarnings("unchecked")
-    public ComputeRunnable(GraphJobMessage msg) {
-      try {
-        this.vertex = vertices.get((V) msg.getVertexId());
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    public ComputeRunnable(GraphJobMessage msg) throws IOException {
+      this.vertex = vertices.get((V) msg.getVertexId());
       this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(),
           msg.getNumOfValues());
     }
 
-    public ComputeRunnable(V v) {
-      try {
-        this.vertex = vertices.get(v);
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
+    public ComputeRunnable(V v) throws IOException {
+      this.vertex = vertices.get(v);
     }
 
     @Override
@@ -361,7 +367,8 @@ public final class GraphJobRunner<V exte
         vertex.compute(msgs);
         vertices.finishVertexComputation(vertex);
       } catch (IOException e) {
-        e.printStackTrace();
+        incrementErrorCount();
+        throw new RuntimeException(e);
       }
     }
   }
@@ -456,7 +463,7 @@ public final class GraphJobRunner<V exte
         vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
             vertex);
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new IOException("Parse exception occured: " + e);
       }
 
       if (!vertexFinished) {
@@ -518,7 +525,7 @@ public final class GraphJobRunner<V exte
 
           addVertex(vertex);
         } catch (IOException e) {
-          e.printStackTrace();
+          throw new RuntimeException(e);
         }
       }
     }
@@ -543,7 +550,7 @@ public final class GraphJobRunner<V exte
           messages.get(partition).add(WritableUtils.serialize(vertex));
         }
       } catch (Exception e) {
-        e.printStackTrace();
+        throw new RuntimeException(e);
       }
     }
   }
@@ -742,7 +749,7 @@ public final class GraphJobRunner<V exte
             try {
               v.readFields(dis);
             } catch (IOException e) {
-              e.printStackTrace();
+              throw new RuntimeException(e);
             }
             index++;
             return v;



Mime
View raw message