hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1679312 - in /hama/trunk: conf/ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/
Date Thu, 14 May 2015 05:07:45 GMT
Author: edwardyoon
Date: Thu May 14 05:07:44 2015
New Revision: 1679312

URL: http://svn.apache.org/r1679312
Log:
HAMA-956: improve the runtime partitioner

Modified:
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu May 14 05:07:44 2015
@@ -204,23 +204,11 @@
   <property>
     <name>bsp.input.runtime.partitioning</name>
     <value>true</value>
-    <description>Basically, we provides a data partitioning program based on BSP job,

+    <description>Basically, we provides a input data partitioning program based on
BSP job, 
     which you can use without any extra program. Set this property to false if you 
     want to use the custom partition program.
     </description>
   </property>
-  <property>
-    <name>bsp.input.runtime.partitioning.sort.mb</name>
-    <value>50</value>
-    <description>The total amount of buffer memory in MB.
-    </description>
-  </property>
-  <property>
-    <name>bsp.input.runtime.partitioning.sort.factor</name>
-    <value>10</value>
-    <description>The maximum number of streams to merge at once; the default is 10.
-    </description>
-  </property>
   
   <property>
     <name>io.serializations</name>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May 14 05:07:44
2015
@@ -45,6 +45,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
@@ -346,25 +347,26 @@ public class BSPJobClient extends Config
       InputSplit[] splits = job.getInputFormat().getSplits(job,
           (maxTasks > configured) ? configured : maxTasks);
 
+      if (job.getConfiguration().getBoolean(
+          Constants.ENABLE_RUNTIME_PARTITIONING, false)) {
+        job = partition(job, splits, maxTasks);
+        maxTasks = job.getInt("hama.partition.count", maxTasks);
+      }
+
+      if (job.getBoolean("input.has.partitioned", false)) {
+        splits = job.getInputFormat().getSplits(job, maxTasks);
+      }
+
       if (maxTasks < splits.length) {
         throw new IOException(
             "Job failed! The number of splits has exceeded the number of max tasks. The number
of splits: "
                 + splits.length + ", The number of max tasks: " + maxTasks);
       }
 
-      /*
-      FIXME now graph job doesn't use this runtime input partitioning
-      Should we support this feature at BSP framework level?
-      
-      if(job.getConfiguration().getBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, false))
{
-        job = partition(job, splits, maxTasks);
-        maxTasks = job.getInt("hama.partition.count", maxTasks);
-      }
-      */
-      
       int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
       if (numOfSplits > configured
-          || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS, false)) {
+          || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
+              false)) {
         job.setNumBspTask(numOfSplits);
       }
 
@@ -408,6 +410,99 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
+  protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
+      throws IOException {
+    String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
+    Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
+    if (fs.exists(partitionDir)) {
+      fs.delete(partitionDir, true);
+    }
+
+    if (job.get("bsp.partitioning.runner.job") != null) {
+      return job;
+    }// Early exit for the partitioner job.
+
+    if (inputPath != null) {
+      int numSplits = splits.length;
+      int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" numTasks = "
+            + numTasks
+            + " numSplits = "
+            + numSplits
+            + " enable = "
+            + (job.getConfiguration().getBoolean(
+                Constants.ENABLE_RUNTIME_PARTITIONING, false)
+                + " class = " + job.getConfiguration().get(
+                Constants.RUNTIME_PARTITIONING_CLASS)));
+      }
+
+      if (numTasks == 0) {
+        numTasks = numSplits;
+      }
+
+      if (job.getConfiguration().getBoolean(
+          Constants.ENABLE_RUNTIME_PARTITIONING, false)
+          && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null) {
+
+        HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
+
+        if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
+          conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
+              .get(Constants.RUNTIME_PARTITIONING_DIR));
+        }
+
+        conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+            job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+        BSPJob partitioningJob = new BSPJob(conf);
+        partitioningJob.setJobName("Runtime partitioning job for "
+            + partitioningJob.getJobName());
+        LOG.debug("partitioningJob input: "
+            + partitioningJob.get(Constants.JOB_INPUT_DIR));
+
+        partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
+        partitioningJob.setInputFormat(job.getInputFormat().getClass());
+        partitioningJob.setInputKeyClass(job.getInputKeyClass());
+        partitioningJob.setInputValueClass(job.getInputValueClass());
+
+        partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
+        partitioningJob.setOutputKeyClass(job.getInputKeyClass());
+        partitioningJob.setOutputValueClass(job.getInputValueClass());
+
+        partitioningJob.setBspClass(PartitioningRunner.class);
+        partitioningJob.setMessageClass(MapWritable.class);
+        partitioningJob.set("bsp.partitioning.runner.job", "true");
+        partitioningJob.getConfiguration().setBoolean(
+            Constants.ENABLE_RUNTIME_PARTITIONING, false);
+        partitioningJob.setOutputPath(partitionDir);
+
+        boolean isPartitioned = false;
+        try {
+          isPartitioned = partitioningJob.waitForCompletion(true);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted partitioning run-time.", e);
+        } catch (ClassNotFoundException e) {
+          LOG.error("Class not found error partitioning run-time.", e);
+        }
+
+        if (isPartitioned) {
+          if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
+            job.setInputPath(new Path(conf
+                .get(Constants.RUNTIME_PARTITIONING_DIR)));
+          } else {
+            job.setInputPath(partitionDir);
+          }
+          job.setBoolean("input.has.partitioned", true);
+          job.setInputFormat(NonSplitSequenceFileInputFormat.class);
+        } else {
+          LOG.error("Error partitioning the input path.");
+          throw new IOException("Runtime partition failed for the job.");
+        }
+      }
+    }
+    return job;
+  }
+
   protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
       Path submitJobFile, FileSystem fs) throws IOException {
     //

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu May 14 05:07:44
2015
@@ -25,15 +25,9 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncException;
@@ -41,36 +35,24 @@ import org.apache.hama.commons.util.KeyV
 import org.apache.hama.pipes.PipesPartitioner;
 
 public class PartitioningRunner extends
-    BSP<Writable, Writable, Writable, Writable, NullWritable> {
+    BSP<Writable, Writable, Writable, Writable, MapWritable> {
   public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
 
   private Configuration conf;
-  private int desiredNum;
-  private FileSystem fs = null;
-  private Path partitionDir;
   private RecordConverter converter;
   private PipesPartitioner<?, ?> pipesPartitioner = null;
 
   @Override
   public final void setup(
-      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
     this.conf = peer.getConfiguration();
-    this.desiredNum = conf.getInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, 1);
-
-    this.fs = FileSystem.get(conf);
 
     converter = ReflectionUtils.newInstance(conf.getClass(
         Constants.RUNTIME_PARTITION_RECORDCONVERTER,
         DefaultRecordConverter.class, RecordConverter.class), conf);
     converter.setup(conf);
-
-    if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
-      this.partitionDir = new Path(conf.get("bsp.output.dir"));
-    } else {
-      this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
-    }
   }
 
   /**
@@ -97,10 +79,9 @@ public class PartitioningRunner extends
         throws IOException;
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks);
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks);
   }
 
   /**
@@ -117,10 +98,9 @@ public class PartitioningRunner extends
     @SuppressWarnings("unchecked")
     @Override
     public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks) {
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks) {
       return Math.abs(partitioner.getPartition(outputRecord.getKey(),
           outputRecord.getValue(), numTasks));
     }
@@ -136,15 +116,13 @@ public class PartitioningRunner extends
   @Override
   @SuppressWarnings({ "rawtypes" })
   public void bsp(
-      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
-    int peerNum = peer.getNumPeers();
     Partitioner partitioner = getPartitioner();
     KeyValuePair<Writable, Writable> rawRecord = null;
     KeyValuePair<Writable, Writable> convertedRecord = null;
 
-    Class convertedKeyClass = null;
     Class rawKeyClass = null;
     Class rawValueClass = null;
     MapWritable raw = null;
@@ -160,175 +138,35 @@ public class PartitioningRunner extends
         throw new IOException("The converted record can't be null.");
       }
 
-      Writable convertedKey = convertedRecord.getKey();
-      convertedKeyClass = convertedKey.getClass();
-
       int index = converter.getPartitionId(convertedRecord, partitioner, conf,
-          peer, desiredNum);
-
-      if (!writerCache.containsKey(index)) {
-        Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
-            + peer.getPeerIndex());
-        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            destFile, convertedKeyClass, MapWritable.class,
-            CompressionType.NONE);
-        writerCache.put(index, writer);
-      }
+          peer, peer.getNumPeers());
 
       raw = new MapWritable();
       raw.put(rawRecord.getKey(), rawRecord.getValue());
-
-      writerCache.get(index).append(convertedKey, raw);
-    }
-
-    for (SequenceFile.Writer w : writerCache.values()) {
-      w.close();
+      peer.send(peer.getPeerName(index), raw);
     }
 
     peer.sync();
-    FileStatus[] status = fs.listStatus(partitionDir);
-    // Call sync() one more time to avoid concurrent access
-    peer.sync();
-
-    for (FileStatus stat : status) {
-      int partitionID = Integer
-          .parseInt(stat.getPath().getName().split("[-]")[1]);
-
-      if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
-        Path destinationFilePath = new Path(partitionDir + "/"
-            + getPartitionName(partitionID));
-
-        FileStatus[] files = fs.listStatus(stat.getPath());
-        if (convertedRecord.getKey() instanceof WritableComparable
-            && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
-          mergeSortedFiles(files, destinationFilePath, convertedKeyClass,
-              rawKeyClass, rawValueClass);
-        } else {
-          mergeFiles(files, destinationFilePath, convertedKeyClass,
-              rawKeyClass, rawValueClass);
-        }
-        fs.delete(stat.getPath(), true);
-      }
-    }
-  }
-
-  @SuppressWarnings("rawtypes")
-  public Map<Integer, KeyValuePair<WritableComparable, MapWritable>> candidates
= new HashMap<Integer, KeyValuePair<WritableComparable, MapWritable>>();
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
-      Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
-      throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-        destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
-    WritableComparable convertedKey;
-    MapWritable value;
-
-    Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer, SequenceFile.Reader>();
-    for (int i = 0; i < status.length; i++) {
-      SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
-          convertedKeyClass, MapWritable.class, conf);
-      sorter.setMemory(conf
-          .getInt("bsp.input.runtime.partitioning.sort.mb", 50) * 1024 * 1024);
-      sorter.setFactor(conf.getInt(
-          "bsp.input.runtime.partitioning.sort.factor", 10));
-      sorter.sort(status[i].getPath(), status[i].getPath().suffix(".sorted"));
-
-      readers.put(i,
-          new SequenceFile.Reader(fs, status[i].getPath().suffix(".sorted"),
-              conf));
-    }
-
-    for (int i = 0; i < readers.size(); i++) {
-      convertedKey = (WritableComparable) ReflectionUtils.newInstance(
-          convertedKeyClass, conf);
-      value = new MapWritable();
-
-      readers.get(i).next(convertedKey, value);
-      candidates.put(i, new KeyValuePair(convertedKey, value));
-    }
-
-    while (readers.size() > 0) {
-      convertedKey = (WritableComparable) ReflectionUtils.newInstance(
-          convertedKeyClass, conf);
-      value = new MapWritable();
-
-      int readerIndex = 0;
-      WritableComparable firstKey = null;
-      MapWritable rawRecord = null;
-
-      for (Map.Entry<Integer, KeyValuePair<WritableComparable, MapWritable>>
keys : candidates
-          .entrySet()) {
-        if (firstKey == null) {
-          readerIndex = keys.getKey();
-          firstKey = keys.getValue().getKey();
-          rawRecord = (MapWritable) keys.getValue().getValue();
-        } else {
-          WritableComparable currentKey = keys.getValue().getKey();
-          if (firstKey.compareTo(currentKey) > 0) {
-            readerIndex = keys.getKey();
-            firstKey = currentKey;
-            rawRecord = (MapWritable) keys.getValue().getValue();
-          }
-        }
-      }
-
-      for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
-        writer.append(e.getKey(), e.getValue());
-      }
 
-      candidates.remove(readerIndex);
+    MapWritable record;
 
-      if (readers.get(readerIndex).next(convertedKey, value)) {
-        candidates.put(readerIndex, new KeyValuePair(convertedKey, value));
-      } else {
-        readers.get(readerIndex).close();
-        readers.remove(readerIndex);
+    while ((record = peer.getCurrentMessage()) != null) {
+      for (Map.Entry<Writable, Writable> e : record.entrySet()) {
+        peer.write(e.getKey(), e.getValue());
       }
     }
 
-    candidates.clear();
-    writer.close();
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  private void mergeFiles(FileStatus[] status, Path destinationFilePath,
-      Class convertedKeyClass, Class rawKeyClass, Class rawValueClass)
-      throws IOException {
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-        destinationFilePath, rawKeyClass, rawValueClass, CompressionType.NONE);
-    Writable key;
-    MapWritable rawRecord;
-
-    for (int i = 0; i < status.length; i++) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs,
-          status[i].getPath(), conf);
-      key = (Writable) ReflectionUtils.newInstance(convertedKeyClass, conf);
-      rawRecord = new MapWritable();
-
-      while (reader.next(key, rawRecord)) {
-        for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
-          writer.append(e.getKey(), e.getValue());
-        }
-      }
-      reader.close();
-    }
-    writer.close();
   }
 
   @Override
   public void cleanup(
-      BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
+      BSPPeer<Writable, Writable, Writable, Writable, MapWritable> peer)
       throws IOException {
     if (this.pipesPartitioner != null) {
       this.pipesPartitioner.cleanup();
     }
   }
 
-  public static int getMergeProcessorID(int partitionID, int peerNum) {
-    return partitionID % peerNum;
-  }
-
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
     Class<? extends Partitioner> partitionerClass = conf.getClass(
@@ -355,8 +193,4 @@ public class PartitioningRunner extends
     return partitioner;
   }
 
-  private static String getPartitionName(int i) {
-    return "part-" + String.valueOf(100000 + i).substring(1, 6);
-  }
-
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Thu
May 14 05:07:44 2015
@@ -162,9 +162,8 @@ public class TestKeyValueTextInputFormat
       
       job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
       job.setPartitioner(HashPartitioner.class);
-
-      // FIXME see 362 line at BSPJobClient.java
-      job.setNumBspTask(1);
+      
+      job.setNumBspTask(2);
       job.setInputPath(dataPath);
       job.setInputFormat(KeyValueTextInputFormat.class);
       job.setInputKeyClass(Text.class);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Thu May 14 05:07:44
2015
@@ -88,15 +88,6 @@ public class TestPartitioning extends Ha
 
     FileSystem fs = FileSystem.get(conf);
     fs.delete(OUTPUT_PATH, true);
-    
-    getMergeProcessorID();
-  }
-
-  public void getMergeProcessorID() {
-    int peerNum = 6;
-    for (int partitionID = 0; partitionID < 8; partitionID++) {
-      assertTrue(PartitioningRunner.getMergeProcessorID(partitionID, peerNum) < peerNum);
-    }
   }
 
   public static class PartionedBSP extends

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1679312&r1=1679311&r2=1679312&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu May 14 05:07:44
2015
@@ -132,7 +132,6 @@ public class GraphJob extends BSPJob {
     ensureState(JobState.DEFINE);
     conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
         RecordConverter.class);
-    conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
 
   /**
@@ -156,7 +155,6 @@ public class GraphJob extends BSPJob {
   public void setPartitioner(
       @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
-    conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
 
   @Override



Mime
View raw message