incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1197643 - in /incubator/hama/trunk/core/src: main/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/
Date Fri, 04 Nov 2011 16:39:26 GMT
Author: tjungblut
Date: Fri Nov  4 16:39:26 2011
New Revision: 1197643

URL: http://svn.apache.org/viewvc?rev=1197643&view=rev
Log:
Old HAMA behaviour for no input given.

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri Nov
 4 16:39:26 2011
@@ -323,10 +323,13 @@ public class BSPJobClient extends Config
     fs.mkdirs(submitJobDir);
     short replication = (short) job.getInt("bsp.submit.replication", 10);
 
-    // Create the splits for the job
-    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
-    job.setNumBspTask(writeSplits(job, submitSplitFile));
-    job.set("bsp.job.split.file", submitSplitFile.toString());
+    // only create the splits if we have an input
+    if (job.get("bsp.input.dir") != null) {
+      // Create the splits for the job
+      LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+      job.setNumBspTask(writeSplits(job, submitSplitFile));
+      job.set("bsp.job.split.file", submitSplitFile.toString());
+    }
 
     String originalJarPath = job.getJar();
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Nov 
4 16:39:26 2011
@@ -51,7 +51,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   private final Configuration conf;
-  private final FileSystem dfs;
+  private final FileSystem fs;
   private BSPJob bspJob;
 
   private volatile Server server = null;
@@ -86,7 +86,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    */
   protected BSPPeerImpl() {
     conf = null;
-    dfs = null;
+    fs = null;
   }
 
   /**
@@ -97,7 +97,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
    */
   protected BSPPeerImpl(final Configuration conf, FileSystem dfs) {
     this.conf = conf;
-    this.dfs = dfs;
+    this.fs = dfs;
   }
 
   /**
@@ -122,7 +122,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     this.splitClass = splitClass;
     this.split = split;
 
-    this.dfs = FileSystem.get(conf);
+    this.fs = FileSystem.get(conf);
 
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
@@ -158,31 +158,33 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     syncClient = SyncServiceFactory.getSyncClient(conf);
     syncClient.init(conf, taskId.getJobID(), taskId);
 
-    InputSplit inputSplit = null;
-    // reinstantiate the split
-    try {
-      inputSplit = (InputSplit) ReflectionUtils.newInstance(getConfiguration()
-          .getClassByName(splitClass), getConfiguration());
-    } catch (ClassNotFoundException exp) {
-      IOException wrap = new IOException("Split class " + splitClass
-          + " not found");
-      wrap.initCause(exp);
-      throw wrap;
-    }
+    // just read input if the user defined one
+    if (conf.get("bsp.input.dir") != null) {
+      InputSplit inputSplit = null;
+      // reinstantiate the split
+      try {
+        inputSplit = (InputSplit) ReflectionUtils.newInstance(
+            getConfiguration().getClassByName(splitClass), getConfiguration());
+      } catch (ClassNotFoundException exp) {
+        IOException wrap = new IOException("Split class " + splitClass
+            + " not found");
+        wrap.initCause(exp);
+        throw wrap;
+      }
 
-    DataInputBuffer splitBuffer = new DataInputBuffer();
-    splitBuffer.reset(split.getBytes(), 0, split.getLength());
-    inputSplit.readFields(splitBuffer);
+      DataInputBuffer splitBuffer = new DataInputBuffer();
+      splitBuffer.reset(split.getBytes(), 0, split.getLength());
+      inputSplit.readFields(splitBuffer);
 
-    in = bspJob.getInputFormat().getRecordReader(inputSplit, bspJob);
+      in = bspJob.getInputFormat().getRecordReader(inputSplit, bspJob);
+    }
 
     // just output something when the user configured it
     if (conf.get("bsp.output.dir") != null) {
       Path outdir = new Path(conf.get("bsp.output.dir"),
           Task.getOutputName(partition));
-
-      outWriter = bspJob.getOutputFormat().getRecordWriter(dfs, bspJob,
-          outdir.makeQualified(dfs).toString());
+      outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob,
+          outdir.makeQualified(fs).toString());
       final RecordWriter<KEYOUT, VALUEOUT> finalOut = outWriter;
 
       collector = new OutputCollector<KEYOUT, VALUEOUT>() {
@@ -236,7 +238,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   void checkpoint(String checkpointedPath, BSPMessageBundle bundle) {
     FSDataOutputStream out = null;
     try {
-      out = this.dfs.create(new Path(checkpointedPath));
+      out = this.fs.create(new Path(checkpointedPath));
       bundle.write(out);
     } catch (IOException ioe) {
       LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
@@ -327,8 +329,12 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   public void close() throws Exception {
-    in.close();
-    outWriter.close();
+    if (in != null) {
+      in.close();
+    }
+    if (outWriter != null) {
+      outWriter.close();
+    }
     this.clear();
     syncClient.close();
     if (server != null) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Fri Nov  4 16:39:26
2011
@@ -37,7 +37,7 @@ public class BSPTask extends Task {
   public static final Log LOG = LogFactory.getLog(BSPTask.class);
 
   private BSPJob conf;
-  BytesWritable split = new BytesWritable();
+  BytesWritable split;
   String splitClass;
 
   public BSPTask() {
@@ -62,7 +62,6 @@ public class BSPTask extends Task {
   @Override
   public void run(BSPJob job, BSPPeerImpl<?, ?, ?, ?> bspPeer,
       BSPPeerProtocol umbilical) throws IOException {
-
     try {
       runBSP(job, bspPeer, split, umbilical);
     } catch (InterruptedException e) {
@@ -115,16 +114,26 @@ public class BSPTask extends Task {
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    split = null;
+    if (split != null) {
+      out.writeBoolean(true);
+      Text.writeString(out, splitClass);
+      split.write(out);
+      split = null;
+    } else {
+      out.writeBoolean(false);
+    }
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    splitClass = Text.readString(in);
-    split.readFields(in);
+    if (in.readBoolean()) {
+      splitClass = Text.readString(in);
+      if(split == null){
+        split = new BytesWritable();
+      }
+      split.readFields(in);
+    }
   }
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Fri Nov
 4 16:39:26 2011
@@ -83,8 +83,8 @@ class JobInProgress {
     this.jobFile = jobFile;
     this.master = master;
 
-    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
-        .value());
+    this.status = new JobStatus(jobId, null, 0L, 0L,
+        JobStatus.State.PREP.value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -99,9 +99,13 @@ class JobInProgress {
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
     this.jobSplit = job.getConf().get("bsp.job.split.file");
+    
+    // default is 1, because with zero, we will hang in infinity
+    this.numBSPTasks = job.getInt("bsp.peers.num", 1);
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
-        .getJobName());
+
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+        job.getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -185,22 +189,30 @@ class JobInProgress {
 
     Path sysDir = new Path(this.master.getSystemDir());
     FileSystem fs = sysDir.getFileSystem(conf);
-    DataInputStream splitFile = fs.open(new Path(this.jobSplit));
+    if (jobSplit != null) {
+      DataInputStream splitFile = fs.open(new Path(this.jobSplit));
 
-    BSPJobClient.RawSplit[] splits;
-    try {
-      splits = BSPJobClient.readSplitFile(splitFile);
-    } finally {
-      splitFile.close();
-    }
-    numBSPTasks = splits.length;
-    LOG.info("num BSPTasks: " + numBSPTasks);
-
-    // adjust number of BSP tasks to actual number of splits
-    this.tasks = new TaskInProgress[numBSPTasks];
-    for (int i = 0; i < numBSPTasks; i++) {
-      tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-          splits[i], this.master, this.conf, this, i);
+      BSPJobClient.RawSplit[] splits;
+      try {
+        splits = BSPJobClient.readSplitFile(splitFile);
+      } finally {
+        splitFile.close();
+      }
+      numBSPTasks = splits.length;
+      LOG.info("num BSPTasks: " + numBSPTasks);
+
+      // adjust number of BSP tasks to actual number of splits
+      this.tasks = new TaskInProgress[numBSPTasks];
+      for (int i = 0; i < numBSPTasks; i++) {
+        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+            splits[i], this.master, this.conf, this, i);
+      }
+    } else {
+      this.tasks = new TaskInProgress[numBSPTasks];
+      for (int i = 0; i < numBSPTasks; i++) {
+        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+            null, this.master, this.conf, this, i);
+      }
     }
 
     // Update job status
@@ -258,9 +270,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter,
-          JobStatus.SUCCEEDED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -292,9 +304,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter,
-          JobStatus.FAILED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.FAILED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Fri Nov
 4 16:39:26 2011
@@ -128,9 +128,13 @@ class TaskInProgress {
           + " attempts for the tip '" + getTIPId() + "'");
       return null;
     }
-
-    String splitClass = rawSplit.getClassName();
-    BytesWritable split = rawSplit.getBytes();
+    
+    String splitClass = null;
+    BytesWritable split = null;
+    if(rawSplit != null){
+      splitClass = rawSplit.getClassName();
+      split = rawSplit.getBytes();
+    }
     
     t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
     activeTasks.put(taskid, status.getGroomName());

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Fri Nov  4 16:39:26 2011
@@ -48,7 +48,7 @@ public class TestBSPMasterGroomServer ex
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",
         org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-        .getCanonicalName());
+            .getCanonicalName());
   }
 
   public void setUp() throws Exception {
@@ -67,7 +67,6 @@ public class TestBSPMasterGroomServer ex
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     assertEquals(this.numOfGroom, cluster.getGroomServers());
     bsp.setNumBspTask(2);
-    bsp.setInputFormat(NullInputFormat.class);
     
     FileSystem fileSys = FileSystem.get(conf);
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java?rev=1197643&r1=1197642&r2=1197643&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestIOJob.java Fri Nov  4
16:39:26 2011
@@ -72,7 +72,7 @@ public class TestIOJob extends HamaClust
     FileSystem fileSys = FileSystem.get(conf);
 
     if (bsp.waitForCompletion(true)) {
-      TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2);
+      TestBSPMasterGroomServer.checkOutput(fileSys, conf,2);
     }
     LOG.info("Client finishes execution job.");
   }



Mime
View raw message