hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1557100 [2/2] - in /hama/trunk: ./ c++/src/main/native/examples/conf/ c++/src/main/native/examples/impl/ c++/src/main/native/pipes/api/hama/ c++/src/main/native/pipes/impl/ core/src/main/java/org/apache/hama/pipes/ core/src/main/java/org/a...
Date Fri, 10 Jan 2014 11:59:46 GMT
Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesApplication.java Fri Jan 10 11:59:46
2014
@@ -53,8 +53,8 @@ import org.apache.hama.pipes.protocol.St
  * 
  */
 public class PipesApplication<K1, V1, K2, V2, M extends Writable> {
-
   private static final Log LOG = LogFactory.getLog(PipesApplication.class);
+  private static final int SERVER_SOCKET_TIMEOUT = 2000;
   private ServerSocket serverSocket;
   private Process process;
   private Socket clientSocket;
@@ -85,11 +85,9 @@ public class PipesApplication<K1, V1, K2
     // add TMPDIR environment variable with the value of java.io.tmpdir
     env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
 
-    /* Set Logging Environment from Configuration */
+    // Set Logging Environment from Configuration
     env.put("hama.pipes.logging",
         conf.getBoolean("hama.pipes.logging", false) ? "1" : "0");
-    LOG.debug("DEBUG hama.pipes.logging: "
-        + conf.getBoolean("hama.pipes.logging", false));
 
     return env;
   }
@@ -213,7 +211,7 @@ public class PipesApplication<K1, V1, K2
       if (!streamingEnabled) {
         LOG.debug("DEBUG: waiting for Client at "
             + serverSocket.getLocalSocketAddress());
-        serverSocket.setSoTimeout(2000);
+        serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT);
         clientSocket = serverSocket.accept();
         LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
 
@@ -234,7 +232,7 @@ public class PipesApplication<K1, V1, K2
       br.close();
 
       throw new SocketException(
-          "Timout: Client pipes application was not connecting!");
+          "Timout: Client pipes application did not connect!");
     }
   }
 
@@ -284,7 +282,7 @@ public class PipesApplication<K1, V1, K2
       } else {
         LOG.debug("DEBUG: waiting for Client at "
             + serverSocket.getLocalSocketAddress());
-        serverSocket.setSoTimeout(2000);
+        serverSocket.setSoTimeout(SERVER_SOCKET_TIMEOUT);
         clientSocket = serverSocket.accept();
         LOG.debug("DEBUG: Client connected! - start BinaryProtocol!");
 
@@ -305,7 +303,7 @@ public class PipesApplication<K1, V1, K2
       br.close();
 
       throw new SocketException(
-          "Timout: Client pipes application was not connecting!");
+          "Timout: Client pipes application did not connect!");
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/PipesBSP.java Fri Jan 10 11:59:46
2014
@@ -19,8 +19,6 @@ package org.apache.hama.pipes;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSP;
@@ -34,24 +32,27 @@ import org.apache.hama.bsp.sync.SyncExce
 public class PipesBSP<K1 extends Writable, V1 extends Writable, K2 extends Writable, V2
extends Writable, M extends Writable>
     extends BSP<K1, V1, K2, V2, BytesWritable> {
 
-  private static final Log LOG = LogFactory.getLog(PipesBSP.class);
   private PipesApplication<K1, V1, K2, V2, BytesWritable> application = new PipesApplication<K1,
V1, K2, V2, BytesWritable>();
+  private boolean applicationIsAlive = true;
 
   @Override
   public void setup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
-    this.application.start(peer);
+    try {
+      this.application.start(peer);
 
-    this.application.getDownlink().runSetup(false, false);
+      this.application.getDownlink().runSetup();
 
-    try {
       this.application.waitForFinish();
+
     } catch (IOException e) {
-      LOG.error(e);
+      this.application.cleanup(false);
       throw e;
+
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      this.application.cleanup(false);
+      throw e;
     }
   }
 
@@ -59,15 +60,20 @@ public class PipesBSP<K1 extends Writabl
   public void bsp(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
-    this.application.getDownlink().runBsp(false, false);
-
     try {
+      this.application.getDownlink().runBsp();
+
       this.application.waitForFinish();
+
     } catch (IOException e) {
-      LOG.error(e);
+      applicationIsAlive = false;
+      this.application.cleanup(false);
       throw e;
+
     } catch (InterruptedException e) {
-      e.printStackTrace();
+      applicationIsAlive = false;
+      this.application.cleanup(false);
+      throw e;
     }
   }
 
@@ -83,17 +89,24 @@ public class PipesBSP<K1 extends Writabl
   public void cleanup(BSPPeer<K1, V1, K2, V2, BytesWritable> peer)
       throws IOException {
 
-    application.getDownlink().runCleanup(false, false);
-
     try {
-      this.application.waitForFinish();
+      if (applicationIsAlive) {
+
+        this.application.getDownlink().runCleanup();
+
+        this.application.waitForFinish();
+
+        this.application.cleanup(true);
+      }
     } catch (IOException e) {
-      LOG.error(e);
+      applicationIsAlive = false;
+      this.application.cleanup(false);
       throw e;
+
     } catch (InterruptedException e) {
-      e.printStackTrace();
-    } finally {
-      this.application.cleanup(true);
+      applicationIsAlive = false;
+      this.application.cleanup(false);
+      throw new IOException(e);
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/Submitter.java Fri Jan 10 11:59:46
2014
@@ -109,46 +109,6 @@ public class Submitter implements Tool {
   }
 
   /**
-   * Set whether the job is using a Java RecordReader.
-   * 
-   * @param conf the configuration to modify
-   * @param value the new value
-   */
-  public static void setIsJavaRecordReader(Configuration conf, boolean value) {
-    conf.setBoolean("hama.pipes.java.recordreader", value);
-  }
-
-  /**
-   * Check whether the job is using a Java RecordReader
-   * 
-   * @param conf the configuration to check
-   * @return is it a Java RecordReader?
-   */
-  public static boolean getIsJavaRecordReader(Configuration conf) {
-    return conf.getBoolean("hama.pipes.java.recordreader", false);
-  }
-
-  /**
-   * Set whether the job will use a Java RecordWriter.
-   * 
-   * @param conf the configuration to modify
-   * @param value the new value to set
-   */
-  public static void setIsJavaRecordWriter(Configuration conf, boolean value) {
-    conf.setBoolean("hama.pipes.java.recordwriter", value);
-  }
-
-  /**
-   * Will the job use a Java RecordWriter?
-   * 
-   * @param conf the configuration to check
-   * @return true, if the output of the job will be written by Java
-   */
-  public static boolean getIsJavaRecordWriter(Configuration conf) {
-    return conf.getBoolean("hama.pipes.java.recordwriter", false);
-  }
-
-  /**
    * Set the configuration, if it doesn't already have a value for the given
    * key.
    * 
@@ -237,8 +197,6 @@ public class Submitter implements Tool {
     setIfUnset(job.getConfiguration(), "bsp.job.name", "Hama Pipes Job");
 
     // DEBUG Output
-    LOG.debug("isJavaRecordReader: "
-        + getIsJavaRecordReader(job.getConfiguration()));
     LOG.debug("BspClass: " + job.getBspClass().getName());
     // conf.setInputFormat(NLineInputFormat.class);
     LOG.debug("InputFormat: " + job.getInputFormat());
@@ -440,7 +398,6 @@ public class Submitter implements Tool {
       }
 
       if (results.hasOption("inputformat")) {
-        setIsJavaRecordReader(job.getConfiguration(), true);
         job.setInputFormat(getClass(results, "inputformat", conf,
             InputFormat.class));
       }
@@ -451,7 +408,6 @@ public class Submitter implements Tool {
       }
 
       if (results.hasOption("outputformat")) {
-        setIsJavaRecordWriter(job.getConfiguration(), true);
         job.setOutputFormat(getClass(results, "outputformat", conf,
             OutputFormat.class));
       }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/BinaryProtocol.java Fri Jan
10 11:59:46 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.pipes.Submitter;
 
@@ -61,6 +62,7 @@ public class BinaryProtocol<K1, V1, K2, 
 
   public final Object hasTaskLock = new Object();
   private boolean hasTask = false;
+  private Throwable uplinkException = null;
   public final Object resultLock = new Object();
   private Integer resultInt = null;
 
@@ -128,6 +130,10 @@ public class BinaryProtocol<K1, V1, K2, 
     return new UplinkReader<K1, V1, K2, V2, M>(this, peer, in);
   }
 
+  public void setUplinkException(Throwable e) {
+    this.uplinkException = e;
+  }
+
   public boolean isHasTask() {
     return this.hasTask;
   }
@@ -223,36 +229,24 @@ public class BinaryProtocol<K1, V1, K2, 
   }
 
   @Override
-  public void runSetup(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
-
+  public void runSetup() throws IOException {
     WritableUtils.writeVInt(this.outStream, MessageType.RUN_SETUP.code);
-    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_SETUP");
   }
 
   @Override
-  public void runBsp(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
-
+  public void runBsp() throws IOException {
     WritableUtils.writeVInt(this.outStream, MessageType.RUN_BSP.code);
-    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_BSP");
   }
 
   @Override
-  public void runCleanup(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
-
+  public void runCleanup() throws IOException {
     WritableUtils.writeVInt(this.outStream, MessageType.RUN_CLEANUP.code);
-    WritableUtils.writeVInt(this.outStream, pipedInput ? 1 : 0);
-    WritableUtils.writeVInt(this.outStream, pipedOutput ? 1 : 0);
     flush();
     setHasTask(true);
     LOG.debug("Sent MessageType.RUN_CLEANUP");
@@ -279,7 +273,7 @@ public class BinaryProtocol<K1, V1, K2, 
     synchronized (this.resultLock) {
       try {
         while (resultInt == null) {
-          this.resultLock.wait();
+          this.resultLock.wait(); // this call blocks
         }
 
         resultVal = resultInt;
@@ -329,17 +323,19 @@ public class BinaryProtocol<K1, V1, K2, 
 
   @Override
   public boolean waitForFinish() throws IOException, InterruptedException {
-    // LOG.debug("waitForFinish... "+hasTask);
+    // LOG.debug("waitForFinish... " + hasTask);
     synchronized (this.hasTaskLock) {
-      try {
-        while (this.hasTask)
-          this.hasTaskLock.wait();
 
-      } catch (InterruptedException e) {
-        LOG.error(e);
+      while (this.hasTask) {
+        this.hasTaskLock.wait(); // this call blocks
       }
-    }
 
+      // Check if UplinkReader thread has thrown exception
+      if (uplinkException != null) {
+        throw new InterruptedException(
+            StringUtils.stringifyException(uplinkException));
+      }
+    }
     return hasTask;
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/DownwardProtocol.java Fri
Jan 10 11:59:46 2014
@@ -58,29 +58,23 @@ public interface DownwardProtocol<K1, V1
   /**
    * runSetup
    * 
-   * @param pipedInput use pipedInput
-   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
-  void runSetup(boolean pipedInput, boolean pipedOutput) throws IOException;
+  void runSetup() throws IOException;
 
   /**
    * runBsp
    * 
-   * @param pipedInput use pipedInput
-   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
-  void runBsp(boolean pipedInput, boolean pipedOutput) throws IOException;
+  void runBsp() throws IOException;
 
   /**
    * runCleanup
    * 
-   * @param pipedInput use pipedInput
-   * @param pipedOutput use pipedOutput
    * @throws IOException
    */
-  void runCleanup(boolean pipedInput, boolean pipedOutput) throws IOException;
+  void runCleanup() throws IOException;
 
   /**
    * getPartition

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java Fri
Jan 10 11:59:46 2014
@@ -244,15 +244,13 @@ public class StreamingProtocol<K1 extend
   }
 
   @Override
-  public void runSetup(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
+  public void runSetup() throws IOException {
     writeLine(MessageType.RUN_SETUP, null);
     waitOnAck();
   }
 
   @Override
-  public void runBsp(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
+  public void runBsp() throws IOException {
     writeLine(MessageType.RUN_BSP, null);
     waitOnAck();
   }
@@ -269,8 +267,7 @@ public class StreamingProtocol<K1 extend
   }
 
   @Override
-  public void runCleanup(boolean pipedInput, boolean pipedOutput)
-      throws IOException {
+  public void runCleanup() throws IOException {
     writeLine(MessageType.RUN_CLEANUP, null);
     waitOnAck();
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/protocol/UplinkReader.java Fri Jan
10 11:59:46 2014
@@ -24,8 +24,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.AbstractMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +37,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -55,12 +56,14 @@ public class UplinkReader<KEYIN, VALUEIN
   private BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binProtocol;
   private BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> peer = null;
   private Configuration conf;
+  private FileSystem fs;
 
   protected DataInputStream inStream;
   protected DataOutputStream outStream;
 
   private Map<Integer, Entry<SequenceFile.Reader, Entry<Writable, Writable>>>
sequenceFileReaders;
   private Map<Integer, Entry<SequenceFile.Writer, Entry<Writable, Writable>>>
sequenceFileWriters;
+  private Set<String> sequenceFileWriterPaths;
 
   public UplinkReader(
       BinaryProtocol<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> binaryProtocol,
@@ -68,6 +71,7 @@ public class UplinkReader<KEYIN, VALUEIN
 
     this.binProtocol = binaryProtocol;
     this.conf = conf;
+    this.fs = FileSystem.get(conf);
 
     this.inStream = new DataInputStream(new BufferedInputStream(stream,
         BinaryProtocol.BUFFER_SIZE));
@@ -76,6 +80,7 @@ public class UplinkReader<KEYIN, VALUEIN
 
     this.sequenceFileReaders = new HashMap<Integer, Entry<SequenceFile.Reader, Entry<Writable,
Writable>>>();
     this.sequenceFileWriters = new HashMap<Integer, Entry<SequenceFile.Writer, Entry<Writable,
Writable>>>();
+    this.sequenceFileWriterPaths = new HashSet<String>();
   }
 
   public UplinkReader(
@@ -99,10 +104,9 @@ public class UplinkReader<KEYIN, VALUEIN
         }
 
         int cmd = readCommand();
-        if (cmd == -1) {
-          continue;
-        }
-        LOG.debug("Handling uplink command: " + MessageType.values()[cmd]);
+        LOG.debug("Handling uplink command: " + cmd);
+        // MessageType.values()[cmd] may cause NullPointerException (bad
+        // command)
 
         if (cmd == MessageType.WRITE_KEYVALUE.code && isPeerAvailable()) { // INCOMING
           writeKeyValue();
@@ -165,10 +169,11 @@ public class UplinkReader<KEYIN, VALUEIN
         } else if (cmd == MessageType.PARTITION_RESPONSE.code) { // INCOMING
           partitionResponse();
         } else {
-          throw new IOException("Bad command code: " + cmd);
+          throw new Exception("Bad command code: " + cmd);
         }
 
       } catch (InterruptedException e) {
+        onError(e);
         return;
       } catch (Throwable e) {
         onError(e);
@@ -180,6 +185,13 @@ public class UplinkReader<KEYIN, VALUEIN
   // onError is overwritten by StreamingProtocol in Hama Streaming
   protected void onError(Throwable e) {
     LOG.error(StringUtils.stringifyException(e));
+
+    // notify binaryProtocol and set Exception
+    synchronized (binProtocol.hasTaskLock) {
+      binProtocol.setUplinkException(e);
+      binProtocol.setHasTask(false);
+      binProtocol.hasTaskLock.notify();
+    }
   }
 
   // readCommand is overwritten by StreamingProtocol in Hama Streaming
@@ -188,7 +200,20 @@ public class UplinkReader<KEYIN, VALUEIN
   }
 
   public void closeConnection() throws IOException {
+    // close input stream
     this.inStream.close();
+
+    // close open SequenceFileReaders
+    for (int fileID : this.sequenceFileReaders.keySet()) {
+      LOG.debug("close SequenceFileReader: " + fileID);
+      this.sequenceFileReaders.get(fileID).getKey().close();
+    }
+
+    // close open SequenceFileWriters
+    for (int fileID : this.sequenceFileWriters.keySet()) {
+      LOG.debug("close SequenceFileWriter: " + fileID);
+      this.sequenceFileWriters.get(fileID).getKey().close();
+    }
   }
 
   public void reopenInput() throws IOException {
@@ -261,14 +286,15 @@ public class UplinkReader<KEYIN, VALUEIN
 
   public void getAllPeerNames() throws IOException {
     LOG.debug("Got MessageType.GET_ALL_PEERNAME");
+    String[] peerNames = peer.getAllPeerNames();
     WritableUtils.writeVInt(this.outStream, MessageType.GET_ALL_PEERNAME.code);
-    WritableUtils.writeVInt(this.outStream, peer.getAllPeerNames().length);
-    for (String s : peer.getAllPeerNames()) {
+    WritableUtils.writeVInt(this.outStream, peerNames.length);
+    for (String s : peerNames) {
       Text.writeString(this.outStream, s);
     }
     binProtocol.flush();
     LOG.debug("Responded MessageType.GET_ALL_PEERNAME - peerNamesCount: "
-        + peer.getAllPeerNames().length);
+        + peerNames.length);
   }
 
   public void sync() throws IOException, SyncException, InterruptedException {
@@ -283,15 +309,18 @@ public class UplinkReader<KEYIN, VALUEIN
 
   public void getMessage() throws IOException {
     LOG.debug("Got MessageType.GET_MSG");
-    WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code);
     Writable message = peer.getCurrentMessage();
     if (message != null) {
+      WritableUtils.writeVInt(this.outStream, MessageType.GET_MSG.code);
       binProtocol.writeObject(message);
+      LOG.debug("Responded MessageType.GET_MSG - Message: "
+          + ((message.toString().length() < 10) ? message.toString() : message
+              .toString().substring(0, 9) + "..."));
+    } else {
+      WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
+      LOG.debug("Responded MessageType.END_OF_DATA");
     }
     binProtocol.flush();
-    LOG.debug("Responded MessageType.GET_MSG - Message: "
-        + ((message.toString().length() < 10) ? message.toString() : message
-            .toString().substring(0, 9) + "..."));
   }
 
   public void getMessageCount() throws IOException {
@@ -408,7 +437,7 @@ public class UplinkReader<KEYIN, VALUEIN
     WritableUtils.writeVInt(this.outStream, MessageType.WRITE_KEYVALUE.code);
     binProtocol.flush();
     LOG.debug("Responded MessageType.WRITE_KEYVALUE");
-    
+
     LOG.debug("Done MessageType.WRITE_KEYVALUE -"
         + " Key: "
         + ((keyOut.toString().length() < 10) ? keyOut.toString() : keyOut
@@ -432,31 +461,43 @@ public class UplinkReader<KEYIN, VALUEIN
 
     int fileID = -1;
 
-    FileSystem fs = FileSystem.get(conf);
     if (option.equals("r")) {
       SequenceFile.Reader reader;
       try {
         reader = new SequenceFile.Reader(fs, new Path(path), conf);
 
-        // try to load key and value class
-        Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
-        Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
-            valueClass);
-
-        // try to instantiate key and value class
-        Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance(
-            sequenceKeyClass, conf);
-        Writable sequenceValueWritable = (Writable) ReflectionUtils
-            .newInstance(sequenceValueClass, conf);
-
-        // put new fileID and key and value Writable instances into HashMap
-        fileID = reader.hashCode();
-        sequenceFileReaders
-            .put(
-                fileID,
-                new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable, Writable>>(
-                    reader, new AbstractMap.SimpleEntry<Writable, Writable>(
-                        sequenceKeyWritable, sequenceValueWritable)));
+        if (reader.getKeyClassName().equals(keyClass)
+            && reader.getValueClassName().equals(valueClass)) {
+          // try to load key and value class
+          Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
+          Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
+              valueClass);
+
+          // try to instantiate key and value class
+          Writable sequenceKeyWritable = (Writable) ReflectionUtils
+              .newInstance(sequenceKeyClass, conf);
+          Writable sequenceValueWritable = (Writable) ReflectionUtils
+              .newInstance(sequenceValueClass, conf);
+
+          // put new fileID and key and value Writable instances into HashMap
+          fileID = reader.hashCode();
+          this.sequenceFileReaders
+              .put(
+                  fileID,
+                  new AbstractMap.SimpleEntry<SequenceFile.Reader, Entry<Writable,
Writable>>(
+                      reader, new AbstractMap.SimpleEntry<Writable, Writable>(
+                          sequenceKeyWritable, sequenceValueWritable)));
+
+        } else { // keyClass or valueClass is wrong
+          fileID = -1;
+          if (!reader.getKeyClassName().equals(keyClass)) {
+            LOG.error("SEQFILE_OPEN - Wrong KeyClass: " + keyClass
+                + " File KeyClass: " + reader.getKeyClassName());
+          } else {
+            LOG.error("SEQFILE_OPEN - Wrong ValueClass: " + valueClass
+                + " File ValueClass: " + reader.getValueClassName());
+          }
+        }
 
       } catch (IOException e) {
         LOG.error("SEQFILE_OPEN - " + e.getMessage());
@@ -469,29 +510,42 @@ public class UplinkReader<KEYIN, VALUEIN
     } else if (option.equals("w")) {
       SequenceFile.Writer writer;
       try {
-
-        // try to load key and value class
-        Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
-        Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
-            valueClass);
-
-        writer = new SequenceFile.Writer(fs, conf, new Path(path),
-            sequenceKeyClass, sequenceValueClass);
-
-        // try to instantiate key and value class
-        Writable sequenceKeyWritable = (Writable) ReflectionUtils.newInstance(
-            sequenceKeyClass, conf);
-        Writable sequenceValueWritable = (Writable) ReflectionUtils
-            .newInstance(sequenceValueClass, conf);
-
-        // put new fileID and key and value Writable instances into HashMap
-        fileID = writer.hashCode();
-        sequenceFileWriters
-            .put(
-                fileID,
-                new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable, Writable>>(
-                    writer, new AbstractMap.SimpleEntry<Writable, Writable>(
-                        sequenceKeyWritable, sequenceValueWritable)));
+        // SequenceFile.Writer has an exclusive lease for a file
+        // No other client can write to this file until other Writer has
+        // completed
+        if (!this.sequenceFileWriterPaths.contains(path)) {
+
+          // try to load key and value class
+          Class<?> sequenceKeyClass = conf.getClassLoader().loadClass(keyClass);
+          Class<?> sequenceValueClass = conf.getClassLoader().loadClass(
+              valueClass);
+
+          // try to instantiate key and value class
+          Writable sequenceKeyWritable = (Writable) ReflectionUtils
+              .newInstance(sequenceKeyClass, conf);
+          Writable sequenceValueWritable = (Writable) ReflectionUtils
+              .newInstance(sequenceValueClass, conf);
+
+          writer = new SequenceFile.Writer(fs, conf, new Path(path),
+              sequenceKeyClass, sequenceValueClass);
+
+          // put new fileID and key and value Writable instances into HashMap
+          fileID = writer.hashCode();
+          this.sequenceFileWriters
+              .put(
+                  fileID,
+                  new AbstractMap.SimpleEntry<SequenceFile.Writer, Entry<Writable,
Writable>>(
+                      writer, new AbstractMap.SimpleEntry<Writable, Writable>(
+                          sequenceKeyWritable, sequenceValueWritable)));
+
+          // add path to set (exclusive access)
+          this.sequenceFileWriterPaths.add(path);
+
+        } else { // Path was already opened by another SequenceFile.Writer
+          fileID = -1;
+          LOG.error("SEQFILE_OPEN - Path: " + path
+              + " is already used by another Writer!");
+        }
 
       } catch (IOException e) {
         LOG.error("SEQFILE_OPEN - " + e.getMessage());
@@ -515,7 +569,7 @@ public class UplinkReader<KEYIN, VALUEIN
     LOG.debug("GOT MessageType.SEQFILE_READNEXT - FileID: " + fileID);
 
     // check if fileID is available in sequenceFileReader
-    if (sequenceFileReaders.containsKey(fileID)) {
+    if (this.sequenceFileReaders.containsKey(fileID)) {
 
       Writable sequenceKeyWritable = sequenceFileReaders.get(fileID).getValue()
           .getKey();
@@ -523,7 +577,7 @@ public class UplinkReader<KEYIN, VALUEIN
           .getValue().getValue();
 
       // try to read next key/value pair from SequenceFile.Reader
-      if (sequenceFileReaders.get(fileID).getKey()
+      if (this.sequenceFileReaders.get(fileID).getKey()
           .next(sequenceKeyWritable, sequenceValueWritable)) {
 
         WritableUtils.writeVInt(this.outStream,
@@ -542,14 +596,14 @@ public class UplinkReader<KEYIN, VALUEIN
                 + "..."));
 
       } else { // false when at end of file
-
         WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
         LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
       }
       binProtocol.flush();
 
     } else { // no fileID stored
-      LOG.warn("SequenceFileReader: FileID " + fileID + " not found!");
+      LOG.error("MessageType.SEQFILE_READNEXT: FileID " + fileID
+          + " not found!");
       WritableUtils.writeVInt(this.outStream, MessageType.END_OF_DATA.code);
       LOG.debug("Responded MessageType.SEQFILE_READNEXT - END_OF_DATA");
       binProtocol.flush();
@@ -563,7 +617,7 @@ public class UplinkReader<KEYIN, VALUEIN
     boolean result = false;
 
     // check if fileID is available in sequenceFileWriter
-    if (sequenceFileWriters.containsKey(fileID)) {
+    if (this.sequenceFileWriters.containsKey(fileID)) {
 
       Writable sequenceKeyWritable = sequenceFileWriters.get(fileID).getValue()
           .getKey();
@@ -577,7 +631,7 @@ public class UplinkReader<KEYIN, VALUEIN
       if ((sequenceKeyWritable != null) && (sequenceValueWritable != null)) {
 
         // append to sequenceFile
-        sequenceFileWriters.get(fileID).getKey()
+        this.sequenceFileWriters.get(fileID).getKey()
             .append(sequenceKeyWritable, sequenceValueWritable);
 
         LOG.debug("Stored data: Key: "
@@ -591,6 +645,13 @@ public class UplinkReader<KEYIN, VALUEIN
 
         result = true;
       }
+    } else { // no fileID stored
+
+      // Skip written data from InputStream
+      int availableBytes = this.inStream.available();
+      this.inStream.skip(availableBytes);
+      LOG.debug("MessageType.SEQFILE_APPEND: skip " + availableBytes + " bytes");
+      LOG.error("MessageType.SEQFILE_APPEND: FileID " + fileID + " not found!");
     }
 
     // RESPOND
@@ -606,12 +667,16 @@ public class UplinkReader<KEYIN, VALUEIN
 
     boolean result = false;
 
-    if (sequenceFileReaders.containsKey(fileID)) {
-      sequenceFileReaders.get(fileID).getKey().close();
+    if (this.sequenceFileReaders.containsKey(fileID)) {
+      this.sequenceFileReaders.get(fileID).getKey().close();
+      this.sequenceFileReaders.remove(fileID);
       result = true;
-    } else if (sequenceFileWriters.containsKey(fileID)) {
-      sequenceFileWriters.get(fileID).getKey().close();
+    } else if (this.sequenceFileWriters.containsKey(fileID)) {
+      this.sequenceFileWriters.get(fileID).getKey().close();
+      this.sequenceFileWriters.remove(fileID);
       result = true;
+    } else { // no fileID stored
+      LOG.error("MessageType.SEQFILE_CLOSE: FileID " + fileID + " not found!");
     }
 
     // RESPOND
@@ -663,9 +728,6 @@ public class UplinkReader<KEYIN, VALUEIN
     } else if (obj instanceof LongWritable) {
       ((LongWritable) obj).set(WritableUtils.readVLong(this.inStream));
 
-    } else if (obj instanceof NullWritable) {
-      throw new IOException("Cannot read data into NullWritable!");
-
     } else {
       try {
         LOG.debug("reading type: " + obj.getClass().getName());

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Fri Jan
10 11:59:46 2014
@@ -107,6 +107,10 @@ public class SequenceFileDumper {
         Path path = new Path(cmdLine.getOptionValue("file"));
 
         FileSystem fs = FileSystem.get(path.toUri(), conf);
+        if (!fs.isFile(path)) {
+          System.out.println("File does not exist: " + path.toString());
+          return;
+        }
         SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
 
         Writer writer;

Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1557100&r1=1557099&r2=1557100&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Fri Jan 10 11:59:46
2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
@@ -44,6 +45,7 @@ import org.apache.hama.bsp.ClusterStatus
 import org.apache.hama.bsp.FileInputFormat;
 import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.KeyValueTextInputFormat;
+import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.message.MessageManager;
@@ -62,6 +64,7 @@ public class TestPipes extends HamaClust
 
   public static final String EXAMPLES_INSTALL_PROPERTY = "hama.pipes.examples.install";
   public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
+  public static final String EXAMPLE_PIESTIMATOR_EXEC = "/examples/piestimator";
   public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication";
   public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
   public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/";
@@ -70,6 +73,7 @@ public class TestPipes extends HamaClust
 
   private HamaConfiguration configuration;
   private static FileSystem fs = null;
+  private String examplesInstallPath;
 
   public TestPipes() {
     configuration = new HamaConfiguration();
@@ -118,21 +122,25 @@ public class TestPipes extends HamaClust
           + " is empty! Skipping TestPipes!");
       return;
     }
+    this.examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
 
     // *** Summation Test ***
     summation();
 
+    // *** PiEstimator Test ***
+    piestimation();
+
     // *** MatrixMultiplication Test ***
     matrixMult();
-    
+
     // Remove local temp folder
     cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
   }
 
   private void summation() throws Exception {
     // Setup Paths
-    String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
-    Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC);
+    Path summationExec = new Path(this.examplesInstallPath
+        + EXAMPLE_SUMMATION_EXEC);
     Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in");
     Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out");
 
@@ -144,15 +152,36 @@ public class TestPipes extends HamaClust
         outputPath, 1, this.numOfGroom);
 
     // Verify output
-    verifySummationOutput(configuration, outputPath, sum);
+    verifyOutput(configuration, outputPath, sum.doubleValue(),
+        Math.pow(10, (DOUBLE_PRECISION * -1)));
+
+    // Clean input and output folder
+    cleanup(fs, inputPath);
+    cleanup(fs, outputPath);
+  }
+
+  private void piestimation() throws Exception {
+    // Setup Paths
+    Path piestimatorExec = new Path(this.examplesInstallPath
+        + EXAMPLE_PIESTIMATOR_EXEC);
+    Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/in");
+    Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "piestimator/out");
+
+    // Run PiEstimator example
+    runProgram(getPiestimatorJob(configuration), piestimatorExec, inputPath,
+        outputPath, 3, this.numOfGroom);
+
+    // Verify output
+    verifyOutput(configuration, outputPath, Math.PI, Math.pow(10, (2 * -1)));
+
     // Clean input and output folder
     cleanup(fs, inputPath);
     cleanup(fs, outputPath);
   }
 
   private void matrixMult() throws Exception {
-    String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
-    Path matrixmultiplicationExec = new Path(examplesInstallPath
+    // Setup Paths
+    Path matrixmultiplicationExec = new Path(this.examplesInstallPath
         + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
 
     Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in");
@@ -192,10 +221,19 @@ public class TestPipes extends HamaClust
     bsp.setInputKeyClass(Text.class);
     bsp.setInputValueClass(Text.class);
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
-    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputKeyClass(NullWritable.class);
     bsp.setOutputValueClass(DoubleWritable.class);
     bsp.set("bsp.message.class", DoubleWritable.class.getName());
+    return bsp;
+  }
 
+  static BSPJob getPiestimatorJob(HamaConfiguration conf) throws IOException {
+    BSPJob bsp = new BSPJob(conf);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(NullWritable.class);
+    bsp.setOutputValueClass(DoubleWritable.class);
+    bsp.set("bsp.message.class", IntWritable.class.getName());
     return bsp;
   }
 
@@ -211,10 +249,10 @@ public class TestPipes extends HamaClust
 
     bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
     bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
-    
+
     bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
     bsp.setPartitioner(PipesPartitioner.class);
-    
+
     // sort sent messages
     bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
         "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
@@ -256,7 +294,6 @@ public class TestPipes extends HamaClust
             * rand.nextDouble();
         matrix[i][j] = new BigDecimal(randomValue).setScale(DOUBLE_PRECISION,
             BigDecimal.ROUND_DOWN).doubleValue();
-        // matrix[i][j] = rand.nextInt(9) + 1;
       }
     }
     return matrix;
@@ -350,22 +387,21 @@ public class TestPipes extends HamaClust
     }
   }
 
-  static void verifySummationOutput(HamaConfiguration conf, Path outputPath,
-      BigDecimal sum) throws IOException {
+  static void verifyOutput(HamaConfiguration conf, Path outputPath,
+      double expectedResult, double delta) throws IOException {
     FileStatus[] listStatus = fs.listStatus(outputPath);
     for (FileStatus status : listStatus) {
       if (!status.isDir()) {
         SequenceFile.Reader reader = new SequenceFile.Reader(fs,
             status.getPath(), conf);
-        Text key = new Text();
+        NullWritable key = NullWritable.get();
         DoubleWritable value = new DoubleWritable();
         if (reader.next(key, value)) {
           LOG.info("Output File: " + status.getPath());
           LOG.info("key: '" + key + "' value: '" + value + "' expected: '"
-              + sum.doubleValue() + "'");
-          assertEquals("Expected value: '" + sum + "' != '" + value + "'",
-              sum.doubleValue(), value.get(),
-              Math.pow(10, (DOUBLE_PRECISION * -1)));
+              + expectedResult + "'");
+          assertEquals("Expected value: '" + expectedResult + "' != '" + value
+              + "'", expectedResult, value.get(), delta);
         }
         reader.close();
       }
@@ -417,9 +453,6 @@ public class TestPipes extends HamaClust
     FileInputFormat.setInputPaths(bsp, inputPath);
     FileOutputFormat.setOutputPath(bsp, outputPath);
 
-    Submitter.setIsJavaRecordReader(conf, true);
-    Submitter.setIsJavaRecordWriter(conf, true);
-
     BSPJobClient jobClient = new BSPJobClient(conf);
 
     // Set bspTaskNum



Mime
View raw message